You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/08 02:12:48 UTC
[incubator-seatunnel] branch dev updated: [Improve][all] change Log to @Slf4j (#3001)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6016100f1 [Improve][all] change Log to @Slf4j (#3001)
6016100f1 is described below
commit 6016100f123cbf32f8b945155aef6b578be462ec
Author: liugddx <80...@qq.com>
AuthorDate: Sat Oct 8 10:12:42 2022 +0800
[Improve][all] change Log to @Slf4j (#3001)
* [Improve][all] change Log to @Slf4j
---
.../apache/seatunnel/flink/FlinkEnvironment.java | 12 ++++------
.../seatunnel/flink/batch/FlinkBatchExecution.java | 12 ++++------
.../flink/stream/FlinkStreamExecution.java | 10 ++++----
.../seatunnel/flink/util/EnvironmentUtil.java | 10 ++++----
.../apache/seatunnel/spark/SparkEnvironment.java | 8 +++----
.../sink/client/ClickhouseSinkWriter.java | 6 ++---
.../sink/file/ClickhouseFileSinkWriter.java | 9 ++++----
.../clickhouse/sink/file/RsyncFileTransfer.java | 12 ++++------
.../clickhouse/sink/file/ScpFileTransfer.java | 8 +++----
.../sink/ElasticsearchSinkWriter.java | 9 +++-----
.../seatunnel/email/sink/EmailSinkWriter.java | 14 +++++------
.../seatunnel/fake/source/FakeSourceReader.java | 8 +++----
.../sink/commit/FileSinkAggregatedCommitter.java | 9 ++++----
.../seatunnel/file/sink/util/FileSystemUtils.java | 11 ++++-----
.../seatunnel/http/sink/HttpSinkWriter.java | 9 ++++----
.../seatunnel/http/source/HttpSourceReader.java | 11 ++++-----
.../kafka/sink/KafkaInternalProducer.java | 8 +++----
.../seatunnel/kafka/sink/KafkaSinkCommitter.java | 10 ++++----
.../kafka/sink/KafkaTransactionSender.java | 10 ++++----
.../seatunnel/kafka/source/KafkaSourceReader.java | 6 +----
.../seatunnel/kudu/kuduclient/KuduInputFormat.java | 15 ++++++------
.../kudu/kuduclient/KuduOutputFormat.java | 16 ++++++-------
.../seatunnel/kudu/sink/KuduSinkWriter.java | 8 ++-----
.../seatunnel/kudu/source/KuduSource.java | 10 ++++----
.../seatunnel/kudu/source/KuduSourceReader.java | 8 +++----
.../mongodb/source/MongodbSourceReader.java | 8 +++----
.../socket/source/SocketSourceReader.java | 7 +++---
.../sink/ClickhouseFileOutputFormat.java | 9 ++++----
.../seatunnel/flink/console/sink/ConsoleSink.java | 7 +++---
.../flink/doris/sink/DorisOutputFormat.java | 7 +++---
.../flink/doris/sink/DorisStreamLoad.java | 9 ++++----
.../flink/druid/sink/DruidOutputFormat.java | 7 +++---
.../seatunnel/flink/druid/source/DruidSource.java | 8 +++----
.../sink/ElasticsearchOutputFormat.java | 7 +++---
.../sink/ElasticsearchOutputFormat.java | 7 +++---
.../apache/seatunnel/flink/file/sink/FileSink.java | 8 +++----
.../apache/seatunnel/flink/jdbc/sink/JdbcSink.java | 17 +++++++-------
.../seatunnel/flink/jdbc/source/JdbcSource.java | 9 ++++----
.../flink/kafka/source/KafkaTableStream.java | 7 +++---
.../org/apache/seatunnel/core/base/Seatunnel.java | 27 +++++++++++-----------
.../core/base/command/BaseTaskExecuteCommand.java | 16 ++++++-------
.../seatunnel/core/base/config/ConfigBuilder.java | 11 ++++-----
.../core/base/config/ExecutionFactory.java | 8 +++----
.../seatunnel/core/base/utils/AsciiArtUtils.java | 8 +++----
.../core/base/utils/CompressionUtils.java | 22 ++++++++----------
.../core/sql/classloader/CustomClassLoader.java | 8 +++----
.../apache/seatunnel/core/sql/job/Executor.java | 14 +++++------
.../flink/command/FlinkApiConfValidateCommand.java | 8 +++----
.../spark/command/SparkConfValidateCommand.java | 8 +++----
.../apache/seatunnel/core/starter/Seatunnel.java | 27 +++++++++++-----------
.../core/starter/config/ConfigBuilder.java | 11 ++++-----
.../seatunnel/core/starter/config/EngineType.java | 3 +--
.../core/starter/utils/AsciiArtUtils.java | 8 +++----
.../core/starter/utils/CompressionUtils.java | 22 ++++++++----------
.../flink/command/FlinkApiConfValidateCommand.java | 8 +++----
.../flink/command/FlinkApiTaskExecuteCommand.java | 6 ++---
.../starter/flink/execution/FlinkExecution.java | 8 +++----
.../spark/command/SparkApiConfValidateCommand.java | 9 ++++----
.../spark/command/SparkApiTaskExecuteCommand.java | 8 +++----
.../starter/spark/execution/SparkExecution.java | 7 +++---
.../seatunnel/command/ApiConfValidateCommand.java | 8 +++----
.../jdbc/internal/xa/XaGroupOpsImplIT.java | 7 +++---
.../seatunnel/engine/e2e/JobExecutionIT.java | 6 ++---
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 9 ++++----
.../seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java | 9 ++++----
.../e2e/flink/v2/jdbc/JdbcPostgresIT.java | 7 ++----
.../e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java | 9 ++++----
.../flink/clickhouse/FakeSourceToClickhouseIT.java | 10 ++++----
.../e2e/flink/file/FakeSourceToFileIT.java | 6 ++---
.../seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java | 9 ++++----
.../e2e/spark/v2/jdbc/JdbcPostgresIT.java | 9 ++++----
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 9 ++++----
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 9 ++++----
seatunnel-engine/seatunnel-engine-common/pom.xml | 4 ++--
.../plugin/discovery/AbstractPluginDiscovery.java | 19 ++++++++-------
.../org/apache/seatunnel/flink/transform/UDF.java | 11 ++++-----
.../translation/flink/sink/FlinkCommitter.java | 8 +++----
.../flink/sink/FlinkGlobalCommitter.java | 8 +++----
.../source/batch/ParallelBatchPartitionReader.java | 8 +++----
79 files changed, 324 insertions(+), 454 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index a33b81930..826e33830 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -46,8 +47,6 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.TernaryBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.ArrayList;
@@ -55,10 +54,9 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
+@Slf4j
public class FlinkEnvironment implements RuntimeEnv {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkEnvironment.class);
-
private Config config;
private StreamExecutionEnvironment environment;
@@ -127,7 +125,7 @@ public class FlinkEnvironment implements RuntimeEnv {
@Override
public void registerPlugin(List<URL> pluginPaths) {
- pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
+ pluginPaths.forEach(url -> log.info("register plugins : {}", url));
List<Configuration> configurations = new ArrayList<>();
try {
configurations.add((Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
@@ -248,7 +246,7 @@ public class FlinkEnvironment implements RuntimeEnv {
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
break;
default:
- LOGGER.warn(
+ log.warn(
"set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
timeType);
break;
@@ -272,7 +270,7 @@ public class FlinkEnvironment implements RuntimeEnv {
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
break;
default:
- LOGGER.warn(
+ log.warn(
"set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
mode);
break;
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index a0ff40894..bb8060e38 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -23,22 +23,20 @@ import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+@Slf4j
public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBatchTransform, FlinkBatchSink, FlinkEnvironment> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkBatchExecution.class);
-
private Config config;
private final FlinkEnvironment flinkEnvironment;
@@ -73,11 +71,11 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
if (whetherExecute(sinks)) {
try {
- LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
+ log.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
- LOGGER.info(execute.toString());
+ log.info(execute.toString());
} catch (Exception e) {
- LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+ log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
throw e;
}
}
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index d5fef8ed6..8672171cd 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -24,21 +24,19 @@ import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+@Slf4j
public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkStreamTransform, FlinkStreamSink, FlinkEnvironment> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStreamExecution.class);
-
private Config config;
private final FlinkEnvironment flinkEnvironment;
@@ -71,10 +69,10 @@ public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkS
sink.outputStream(flinkEnvironment, stream);
}
try {
- LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+ log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
} catch (Exception e) {
- LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+ log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
throw e;
}
}
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
index e23e26fe1..768d9d75d 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
@@ -21,18 +21,16 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
+@Slf4j
public final class EnvironmentUtil {
- private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentUtil.class);
-
private EnvironmentUtil() {
}
@@ -58,11 +56,11 @@ public final class EnvironmentUtil {
Time.of(delayInterval, TimeUnit.MILLISECONDS)));
break;
default:
- LOGGER.warn("set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy);
+ log.warn("set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy);
}
}
} catch (Exception e) {
- LOGGER.warn("set restart.strategy in config '{}' exception", config, e);
+ log.warn("set restart.strategy in config '{}' exception", config, e);
}
}
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index b9758babe..b003f6c48 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -28,22 +28,20 @@ import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.List;
+@Slf4j
public class SparkEnvironment implements RuntimeEnv {
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkEnvironment.class);
-
private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
private SparkConf sparkConf;
@@ -92,7 +90,7 @@ public class SparkEnvironment implements RuntimeEnv {
@Override
public void registerPlugin(List<URL> pluginPaths) {
- LOGGER.info("register plugins :" + pluginPaths);
+ log.info("register plugins :" + pluginPaths);
// TODO we use --jar parameter to support submit multi-jar in spark cluster at now. Refactor it to
// support submit multi-jar in code or remove this logic.
// this.sparkSession.conf().set("spark.jars",pluginPaths.stream().map(URL::getPath).collect(Collectors.joining(",")));
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index cc38ca0c0..27359ee32 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -39,9 +39,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.PreparedStatement;
@@ -54,10 +53,9 @@ import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+@Slf4j
public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSinkWriter.class);
-
private final Context context;
private final ReaderOption option;
private final ShardRouter shardRouter;
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index c19fdae34..ac5900915 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -30,9 +30,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSink
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
@@ -55,8 +54,8 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+@Slf4j
public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file";
private static final int UUID_LENGTH = 10;
private final FileReaderOption readerOption;
@@ -178,7 +177,7 @@ public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKComm
uuid));
command.add("--path");
command.add("\"" + clickhouseLocalFile + "\"");
- LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+ log.info("Generate clickhouse local file command: {}", String.join(" ", command));
ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
Process start = processBuilder.start();
// we just wait for the process to finish
@@ -187,7 +186,7 @@ public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKComm
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
- LOGGER.info(line);
+ log.info(line);
}
}
start.waitFor();
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
index 9d1a888c2..4dd8bc766 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
@@ -17,11 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
@@ -30,10 +29,9 @@ import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
+@Slf4j
public class RsyncFileTransfer implements FileTransfer {
- private static final Logger LOGGER = LoggerFactory.getLogger(RsyncFileTransfer.class);
-
private static final int SSH_PORT = 22;
private final String host;
@@ -84,7 +82,7 @@ public class RsyncFileTransfer implements FileTransfer {
rsyncCommand.add(sshParameter);
rsyncCommand.add(sourcePath);
rsyncCommand.add(String.format("root@%s:%s", host, targetPath));
- LOGGER.info("Generate rsync command: {}", String.join(" ", rsyncCommand));
+ log.info("Generate rsync command: {}", String.join(" ", rsyncCommand));
ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", rsyncCommand));
Process start = processBuilder.start();
// we just wait for the process to finish
@@ -93,7 +91,7 @@ public class RsyncFileTransfer implements FileTransfer {
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
- LOGGER.info(line);
+ log.info(line);
}
}
start.waitFor();
@@ -110,7 +108,7 @@ public class RsyncFileTransfer implements FileTransfer {
command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
try {
String finalCommand = String.join(" ", command);
- LOGGER.info("execute remote command: " + finalCommand);
+ log.info("execute remote command: " + finalCommand);
clientSession.executeRemoteCommand(finalCommand);
} catch (IOException e) {
// always return error cause xargs return shell command result
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
index 53347457d..1c207276a 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
@@ -17,22 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.scp.client.ScpClient;
import org.apache.sshd.scp.client.ScpClientCreator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+@Slf4j
public class ScpFileTransfer implements FileTransfer {
- private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class);
-
private static final int SCP_PORT = 22;
private final String host;
@@ -90,7 +88,7 @@ public class ScpFileTransfer implements FileTransfer {
command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
try {
String finalCommand = String.join(" ", command);
- LOGGER.info("execute remote command: " + finalCommand);
+ log.info("execute remote command: " + finalCommand);
clientSession.executeRemoteCommand(finalCommand);
} catch (IOException e) {
// always return error cause xargs return shell command result
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 8754d7567..6a2c32cd6 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -33,8 +33,7 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.Elasticsear
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
@@ -44,6 +43,7 @@ import java.util.Optional;
/**
* ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
*/
+@Slf4j
public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkStateT> {
private final SinkWriter.Context context;
@@ -52,9 +52,6 @@ public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWri
private final List<String> requestEsList;
private EsRestClient esRestClient;
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
-
public ElasticsearchSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
@@ -115,7 +112,7 @@ public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWri
if (tryCnt == maxRetry) {
throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
}
- LOGGER.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
+ log.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
}
}
diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
index 1a86dce22..2832b7d40 100644
--- a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
@@ -25,8 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.sun.mail.util.MailSSLSocketFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import javax.activation.DataHandler;
import javax.activation.DataSource;
@@ -48,10 +47,9 @@ import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
+@Slf4j
public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private static final Logger LOGGER = LoggerFactory.getLogger(EmailSinkWriter.class);
-
private final SeaTunnelRowType seaTunnelRowType;
private EmailSinkConfig config;
private StringBuffer stringBuffer;
@@ -130,9 +128,9 @@ public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
// send a message
Transport.send(message);
- LOGGER.info("Sent message successfully....");
+ log.info("Sent message successfully....");
} catch (Exception e) {
- LOGGER.warn("send email Fail.", e);
+ log.warn("send email Fail.", e);
throw new RuntimeException("send email Fail.", e);
}
}
@@ -148,9 +146,9 @@ public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
FileWriter fileWritter = new FileWriter(file.getName());
fileWritter.write(data);
fileWritter.close();
- LOGGER.info("Create File successfully....");
+ log.info("Create File successfully....");
} catch (IOException e) {
- LOGGER.warn("Create File Fail.", e);
+ log.warn("Create File Fail.", e);
throw new RuntimeException("Create File Fail.", e);
}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index b847838c0..2dbec493b 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -23,15 +23,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.List;
+@Slf4j
public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
-
private final SingleSplitReaderContext context;
private final FakeDataGenerator fakeDataGenerator;
@@ -61,7 +59,7 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
}
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
- LOGGER.info("Closed the bounded fake source");
+ log.info("Closed the bounded fake source");
context.signalNoMoreElement();
}
Thread.sleep(1000L);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 06e8a4e5a..de3354642 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -20,8 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
@@ -29,8 +28,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@Slf4j
public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FileSinkAggregatedCommitter.class);
@Override
public List<FileAggregatedCommitInfo> commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
@@ -46,7 +45,7 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
FileSystemUtils.deleteFile(entry.getKey());
}
} catch (Exception e) {
- LOGGER.error("commit aggregatedCommitInfo error ", e);
+ log.error("commit aggregatedCommitInfo error ", e);
errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
}
});
@@ -100,7 +99,7 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
FileSystemUtils.deleteFile(entry.getKey());
}
} catch (Exception e) {
- LOGGER.error("abort aggregatedCommitInfo error ", e);
+ log.error("abort aggregatedCommitInfo error ", e);
}
});
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index a788d6222..c37874b12 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -18,13 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -32,8 +31,8 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+@Slf4j
public class FileSystemUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemUtils.class);
public static final int WRITE_BUFFER_SIZE = 2048;
@@ -79,14 +78,14 @@ public class FileSystemUtils {
*/
public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
FileSystem fileSystem = getFileSystem(newName);
- LOGGER.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
+ log.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
Path oldPath = new Path(oldName);
Path newPath = new Path(newName);
if (rmWhenExist) {
if (fileExist(newName) && fileExist(oldName)) {
fileSystem.delete(newPath, true);
- LOGGER.info("Delete already file: {}", newPath);
+ log.info("Delete already file: {}", newPath);
}
}
if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
@@ -94,7 +93,7 @@ public class FileSystemUtils {
}
if (fileSystem.rename(oldPath, newPath)) {
- LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
+ log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
} else {
throw new IOException("rename file :[" + oldPath + "] to [" + newPath + "] error");
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index 2def75ba7..d6980e6a9 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -26,14 +26,13 @@ import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Objects;
+@Slf4j
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkWriter.class);
protected final HttpClientProvider httpClient;
protected final SeaTunnelRowType seaTunnelRowType;
protected final HttpParameter httpParameter;
@@ -62,9 +61,9 @@ public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
if (HttpResponse.STATUS_OK == response.getCode()) {
return;
}
- LOGGER.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
+ log.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
} catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 0419e2ec3..1582c6442 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -28,14 +28,13 @@ import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import com.google.common.base.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Objects;
+@Slf4j
public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
- private static final Logger LOGGER = LoggerFactory.getLogger(HttpSourceReader.class);
protected final SingleSplitReaderContext context;
protected final HttpParameter httpParameter;
protected HttpClientProvider httpClient;
@@ -70,13 +69,13 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
}
return;
}
- LOGGER.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
+ log.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
} catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
} finally {
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
- LOGGER.info("Closed the bounded http source");
+ log.info("Closed the bounded http source");
context.signalNoMoreElement();
} else {
if (httpParameter.getPollIntervalMillis() > 0) {
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
index 77d2535f5..688e1284b 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
@@ -19,11 +19,10 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.common.utils.ReflectionUtils;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.errors.ProducerFencedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
@@ -34,10 +33,9 @@ import java.util.Properties;
/**
* A {@link KafkaProducer} that allow resume transaction from transactionId
*/
+@Slf4j
public class KafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaInternalProducer.class);
-
private static final String TRANSACTION_MANAGER_STATE_ENUM =
"org.apache.kafka.clients.producer.internals.TransactionManager$State";
private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
@@ -97,7 +95,7 @@ public class KafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
public void resumeTransaction(long producerId, short epoch) {
- LOGGER.info(
+ log.info(
"Attempting to resume transaction {} with producerId {} and epoch {}",
transactionalId,
producerId,
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index aafa0bd4f..8e6740e03 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -22,18 +22,16 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
+@Slf4j
public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkCommitter.class);
-
private final Config pluginConfig;
private KafkaInternalProducer<?, ?> kafkaProducer;
@@ -49,8 +47,8 @@ public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
}
for (KafkaCommitInfo commitInfo : commitInfos) {
String transactionId = commitInfo.getTransactionId();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Committing transaction {}", transactionId);
+ if (log.isDebugEnabled()) {
+ log.debug("Committing transaction {}", transactionId);
}
KafkaProducer<?, ?> producer = getProducer(commitInfo);
producer.commitTransaction();
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index ee8762ac4..a3eaba00b 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -23,11 +23,10 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
@@ -39,10 +38,9 @@ import java.util.Properties;
* @param <K> key type.
* @param <V> value type.
*/
+@Slf4j
public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTransactionSender.class);
-
private KafkaInternalProducer<K, V> kafkaProducer;
private String transactionId;
private final String transactionPrefix;
@@ -91,8 +89,8 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
for (long i = checkpointId; ; i++) {
String transactionId = generateTransactionId(this.transactionPrefix, i);
producer.setTransactionalId(transactionId);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Abort kafka transaction: {}", transactionId);
+ if (log.isDebugEnabled()) {
+ log.debug("Abort kafka transaction: {}", transactionId);
}
producer.flush();
if (producer.getEpoch() == 0) {
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index dc9c62093..bb5486956 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -31,8 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
@@ -54,8 +52,6 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
private static final long THREAD_WAIT_TIME = 500L;
private static final long POLL_TIMEOUT = 10000L;
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSourceReader.class);
-
private final SourceReader.Context context;
private final ConsumerMetadata metadata;
private final Set<KafkaSourceSplit> sourceSplits;
@@ -171,7 +167,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
@Override
public void handleNoMoreSplits() {
- LOGGER.info("receive no more splits message, this reader will not add new split.");
+ log.info("receive no more splits message, this reader will not add new split.");
}
@Override
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
index d00cc2bd3..cb8a59907 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.ExceptionUtil;
import org.apache.seatunnel.common.constants.PluginType;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
@@ -33,8 +34,6 @@ import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.RowResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -44,8 +43,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+@Slf4j
public class KuduInputFormat implements Serializable {
- private static final Logger LOGGER = LoggerFactory.getLogger(KuduInputFormat.class);
public KuduInputFormat(String kuduMaster, String tableName, String columnsList) {
this.kuduMaster = kuduMaster;
@@ -80,7 +79,7 @@ public class KuduInputFormat implements Serializable {
keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
columns = schema.getColumns();
} catch (KuduException e) {
- LOGGER.warn("get table Columns Schemas Fail.", e);
+ log.warn("get table Columns Schemas Fail.", e);
throw new RuntimeException("get table Columns Schemas Fail..", e);
}
return columns;
@@ -136,7 +135,7 @@ public class KuduInputFormat implements Serializable {
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}
} catch (Exception e) {
- LOGGER.warn("get row type info exception.", e);
+ log.warn("get row type info exception.", e);
throw new PrepareFailException("kudu", PluginType.SOURCE, ExceptionUtil.getMessage(e));
}
return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
@@ -149,7 +148,7 @@ public class KuduInputFormat implements Serializable {
kuduClient = kuduClientBuilder.build();
- LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+ log.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
}
@@ -180,7 +179,7 @@ public class KuduInputFormat implements Serializable {
kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
.addPredicate(upperPred).build();
} catch (KuduException e) {
- LOGGER.warn("get the Kuduscan object for each splice exception", e);
+ log.warn("get the Kuduscan object for each splice exception", e);
throw new RuntimeException("get the Kuduscan object for each splice exception.", e);
}
return kuduScanner;
@@ -191,7 +190,7 @@ public class KuduInputFormat implements Serializable {
try {
kuduClient.close();
} catch (KuduException e) {
- LOGGER.warn("Kudu Client close failed.", e);
+ log.warn("Kudu Client close failed.", e);
throw new RuntimeException("Kudu Client close failed.", e);
} finally {
kuduClient = null;
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
index 909940f49..c356baabf 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.Insert;
@@ -30,8 +31,6 @@ import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -41,11 +40,10 @@ import java.sql.Timestamp;
/**
* A Kudu outputFormat
*/
+@Slf4j
public class KuduOutputFormat
implements Serializable {
- private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
-
public static final long TIMEOUTMS = 18000;
public static final long SESSIONTIMEOUTMS = 100000;
@@ -130,7 +128,7 @@ public class KuduOutputFormat
try {
kuduSession.apply(upsert);
} catch (KuduException e) {
- LOGGER.error("Failed to upsert.", e);
+ log.error("Failed to upsert.", e);
throw new RuntimeException("Failed to upsert.", e);
}
}
@@ -143,7 +141,7 @@ public class KuduOutputFormat
try {
kuduSession.apply(insert);
} catch (KuduException e) {
- LOGGER.error("Failed to insert.", e);
+ log.error("Failed to insert.", e);
throw new RuntimeException("Failed to insert.", e);
}
}
@@ -172,10 +170,10 @@ public class KuduOutputFormat
try {
kuduTable = kuduClient.openTable(kuduTableName);
} catch (KuduException e) {
- LOGGER.error("Failed to initialize the Kudu client.", e);
+ log.error("Failed to initialize the Kudu client.", e);
throw new RuntimeException("Failed to initialize the Kudu client.", e);
}
- LOGGER.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
+ log.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
}
public void closeOutputFormat() {
@@ -184,7 +182,7 @@ public class KuduOutputFormat
kuduClient.close();
kuduSession.close();
} catch (KuduException ignored) {
- LOGGER.warn("Failed to close Kudu Client.", ignored);
+ log.warn("Failed to close Kudu Client.", ignored);
} finally {
kuduClient = null;
kuduSession = null;
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
index d78f48618..994321c8a 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
@@ -26,20 +26,16 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduOutputForma
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.NonNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+@Slf4j
public class KuduSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private static final Logger LOGGER = LoggerFactory.getLogger(KuduSinkWriter.class);
private SeaTunnelRowType seaTunnelRowType;
private Config pluginConfig;
-
-
private KuduOutputFormat fileWriter;
-
private KuduSinkConfig kuduSinkConfig;
public KuduSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index c10796a21..db11bac07 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -36,22 +36,20 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+@Slf4j
@AutoService(SeaTunnelSource.class)
public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSourceState> {
- private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
-
private SeaTunnelRowType rowTypeInfo;
private KuduInputFormat kuduInputFormat;
private PartitionParameter partitionParameter;
@@ -85,7 +83,7 @@ public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
@Override
public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> restoreEnumerator(
- SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSourceState checkpointState) {
+ SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSourceState checkpointState) {
// todo:
return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter);
}
@@ -176,7 +174,7 @@ public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
}
} catch (Exception e) {
- LOGGER.warn("get row type info exception", e);
+ log.warn("get row type info exception", e);
throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString());
}
return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
index e3c3cd25d..05e621bad 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
@@ -23,21 +23,19 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+@Slf4j
public class KuduSourceReader implements SourceReader<SeaTunnelRow, KuduSourceSplit> {
- private static final Logger LOGGER = LoggerFactory.getLogger(KuduSourceReader.class);
-
private final SourceReader.Context context;
private final KuduInputFormat kuduInputFormat;
@@ -79,7 +77,7 @@ public class KuduSourceReader implements SourceReader<SeaTunnelRow, KuduSourceSp
}
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
- LOGGER.info("Closed the bounded fake source");
+ log.info("Closed the bounded fake source");
context.signalNoMoreElement();
}
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
index 61a920548..3391205d8 100644
--- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
@@ -31,17 +31,15 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
+import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
+@Slf4j
public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
- private static final Logger LOGGER = LoggerFactory.getLogger(MongodbSourceReader.class);
-
private final SingleSplitReaderContext context;
private MongoClient client;
@@ -102,7 +100,7 @@ public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
} finally {
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
- LOGGER.info("Closed the bounded mongodb source");
+ log.info("Closed the bounded mongodb source");
context.signalNoMoreElement();
}
}
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
index d37ce180d..8a86fa47b 100644
--- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -23,8 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
@@ -32,8 +31,8 @@ import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
+@Slf4j
public class SocketSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SocketSourceReader.class);
private static final int CHAR_BUFFER_SIZE = 8192;
private final SocketSourceParameter parameter;
private final SingleSplitReaderContext context;
@@ -48,7 +47,7 @@ public class SocketSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
@Override
public void open() throws Exception {
socket = new Socket();
- LOGGER.info("connect socket server, host:[{}], port:[{}] ", this.parameter.getHost(), this.parameter.getPort());
+ log.info("connect socket server, host:[{}], port:[{}] ", this.parameter.getHost(), this.parameter.getPort());
socket.connect(new InetSocketAddress(this.parameter.getHost(), this.parameter.getPort()), 0);
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
index e5b293c2a..cfccf1d39 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
@@ -38,11 +38,10 @@ import org.apache.seatunnel.flink.clickhouse.sink.file.ScpFileTransfer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;
import java.io.BufferedReader;
@@ -66,9 +65,9 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+@Slf4j
public class ClickhouseFileOutputFormat {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileOutputFormat.class);
private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/flink-file";
private static final int UUID_LENGTH = 10;
@@ -195,7 +194,7 @@ public class ClickhouseFileOutputFormat {
uuid));
command.add("--path");
command.add("\"" + clickhouseLocalFile + "\"");
- LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+ log.info("Generate clickhouse local file command: {}", String.join(" ", command));
ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
Process start = processBuilder.start();
// we just wait for the process to finish
@@ -204,7 +203,7 @@ public class ClickhouseFileOutputFormat {
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
String line;
while ((line = bufferedReader.readLine()) != null) {
- LOGGER.info(line);
+ log.info(line);
}
}
start.waitFor();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index bf178aa3b..c0fd9cb99 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -26,18 +26,17 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@AutoService(BaseFlinkSink.class)
+@Slf4j
public class ConsoleSink extends RichOutputFormat<Row> implements FlinkBatchSink, FlinkStreamSink {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSink.class);
private static final String LIMIT = "limit";
private Integer limit = Integer.MAX_VALUE;
@@ -49,7 +48,7 @@ public class ConsoleSink extends RichOutputFormat<Row> implements FlinkBatchSink
try {
rowDataSet.first(limit).print();
} catch (Exception e) {
- LOGGER.error("Failed to print result! ", e);
+ log.error("Failed to print result! ", e);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java
index e3a19d523..0cfeb3cd0 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java
@@ -17,14 +17,13 @@
package org.apache.seatunnel.flink.doris.sink;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -42,8 +41,8 @@ import java.util.regex.Pattern;
/**
* DorisDynamicOutputFormat
**/
+@Slf4j
public class DorisOutputFormat<T> extends RichOutputFormat<T> {
- private static final Logger LOGGER = LoggerFactory.getLogger(DorisOutputFormat.class);
private static final long serialVersionUID = -4514164348993670086L;
private static final long DEFAULT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -215,7 +214,7 @@ public class DorisOutputFormat<T> extends RichOutputFormat<T> {
batch.clear();
break;
} catch (Exception e) {
- LOGGER.error("doris sink error, retry times = {}", i, e);
+ log.error("doris sink error, retry times = {}", i, e);
if (i >= maxRetries) {
throw new IOException(e);
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java
index ebeb20845..4d4b0e865 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java
@@ -17,10 +17,9 @@
package org.apache.seatunnel.flink.doris.sink;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
@@ -43,9 +42,9 @@ import java.util.UUID;
/**
* doris streamLoad
*/
+@Slf4j
public class DorisStreamLoad implements Serializable {
- private static final Logger LOGGER = LoggerFactory.getLogger(DorisStreamLoad.class);
private static final long serialVersionUID = -595233501819950489L;
private static final List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout");
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -84,7 +83,7 @@ public class DorisStreamLoad implements Serializable {
public void load(String value) {
LoadResponse loadResponse = loadBatch(value);
- LOGGER.info("Streamload Response:{}", loadResponse);
+ log.info("Streamload Response:{}", loadResponse);
if (loadResponse.status != HttpResponseStatus.OK.code()) {
throw new RuntimeException("stream load error: " + loadResponse.respContent);
} else {
@@ -140,7 +139,7 @@ public class DorisStreamLoad implements Serializable {
return new LoadResponse(status, respMsg, response.toString());
} catch (Exception e) {
String err = "failed to stream load data with label:" + label;
- LOGGER.warn(err, e);
+ log.warn(err, e);
throw new RuntimeException("stream load error: " + err);
} finally {
if (feConn != null) {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
index dca0c26c4..7cb18cd9a 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.extern.slf4j.Slf4j;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -44,8 +45,6 @@ import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
@@ -57,9 +56,9 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
+@Slf4j
public class DruidOutputFormat extends RichOutputFormat<Row> {
- private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class);
private static final long serialVersionUID = -7410857670269773005L;
private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
@@ -149,7 +148,7 @@ public class DruidOutputFormat extends RichOutputFormat<Row> {
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
- LOGGER.info("Druid write task has been sent, and the response is {}", response.toString());
+ log.info("Druid write task has been sent, and the response is {}", response.toString());
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
index 83478f57d..d964bb666 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
@@ -36,14 +36,13 @@ import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -54,12 +53,11 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+@Slf4j
@AutoService(BaseFlinkSource.class)
public class DruidSource implements FlinkBatchSource {
private static final long serialVersionUID = 8152628883440481281L;
- private static final Logger LOGGER = LoggerFactory.getLogger(DruidSource.class);
-
private Config config;
private DruidInputFormat druidInputFormat;
@@ -150,7 +148,7 @@ public class DruidSource implements FlinkBatchSource {
}
}
} catch (Exception e) {
- LOGGER.warn("Failed to get column information from JDBC URL: {}", jdbcURL, e);
+ log.warn("Failed to get column information from JDBC URL: {}", jdbcURL, e);
}
int size = map.size();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
index 4687bd126..83c1d9d95 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.elasticsearch6.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@@ -33,16 +34,14 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.List;
+@Slf4j
public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
private static final long serialVersionUID = 2048590860723433896L;
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
private final Config config;
@@ -78,7 +77,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
try {
transportClient.addTransportAddresses(new TransportAddress(InetAddress.getByName(host.split(":")[0]), Integer.parseInt(host.split(":")[1])));
} catch (Exception e) {
- LOGGER.warn("Host '{}' parse failed.", host, e);
+ log.warn("Host '{}' parse failed.", host, e);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
index a55e38424..18341f7d4 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
@@ -21,6 +21,7 @@ import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@@ -35,16 +36,14 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.List;
+@Slf4j
public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
private static final long serialVersionUID = 2048590860723433896L;
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
private final Config config;
@@ -80,7 +79,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
try {
transportClient.addTransportAddresses(new TransportAddress(InetAddress.getByName(host.split(":")[0]), Integer.parseInt(host.split(":")[1])));
} catch (Exception e) {
- LOGGER.warn("Host '{}' parse failed.", host, e);
+ log.warn("Host '{}' parse failed.", host, e);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
index b8d3400c8..c1c0ae73e 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.DataSet;
@@ -43,16 +44,13 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
+@Slf4j
@AutoService(BaseFlinkSink.class)
public class FileSink implements FlinkStreamSink, FlinkBatchSink {
- private static final Logger LOGGER = LoggerFactory.getLogger(FileSink.class);
-
private static final long serialVersionUID = -1648045076508797396L;
private static final String PATH = "path";
@@ -114,7 +112,7 @@ public class FileSink implements FlinkStreamSink, FlinkBatchSink {
outputFormat = new TextOutputFormat<>(filePath);
break;
default:
- LOGGER.warn(" unknown file_format [{}],only support json,csv,text", format);
+ log.warn(" unknown file_format [{}],only support json,csv,text", format);
break;
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 1ccb3cb98..67530ea1a 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -39,6 +39,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -51,8 +52,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -60,10 +59,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
+@Slf4j
@AutoService(BaseFlinkSink.class)
public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSink.class);
private static final long serialVersionUID = 3677571223952518115L;
private static final int DEFAULT_BATCH_SIZE = 5000;
private static final int DEFAULT_MAX_RETRY_TIMES = 3;
@@ -189,11 +188,11 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
private void executePreSql() {
if (StringUtils.isNotBlank(preSql)) {
- LOGGER.info("Starting to execute pre sql: \n {}", preSql);
+ log.info("Starting to execute pre sql: \n {}", preSql);
try {
executeSql(preSql);
} catch (SQLException e) {
- LOGGER.error("Execute pre sql failed, pre sql is : \n {} \n", preSql, e);
+ log.error("Execute pre sql failed, pre sql is : \n {} \n", preSql, e);
throw new RuntimeException(e);
}
}
@@ -201,11 +200,11 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
private void executePostSql() {
if (StringUtils.isNotBlank(postSql)) {
- LOGGER.info("Starting to execute post sql: \n {}", postSql);
+ log.info("Starting to execute post sql: \n {}", postSql);
try {
executeSql(postSql);
} catch (SQLException e) {
- LOGGER.error("Execute post sql failed, post sql is : \n {} \n", postSql, e);
+ log.error("Execute post sql failed, post sql is : \n {} \n", postSql, e);
if (!ignorePostSqlExceptions) {
throw new RuntimeException(e);
}
@@ -215,10 +214,10 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
private void executeSql(String sql) throws SQLException {
try (Connection connection = DriverManager.getConnection(dbUrl, username, password);
- Statement statement = connection.createStatement()) {
+ Statement statement = connection.createStatement()) {
statement.execute(sql);
- LOGGER.info("Executed sql successfully.");
+ log.info("Executed sql successfully.");
}
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index c01e1e0ee..efc0130a0 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -49,6 +49,7 @@ import org.apache.seatunnel.flink.jdbc.input.TypeInformationMap;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -57,8 +58,6 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -70,11 +69,11 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+@Slf4j
@AutoService(BaseFlinkSource.class)
public class JdbcSource implements FlinkBatchSource {
private static final long serialVersionUID = -3349505356339446415L;
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSource.class);
private static final int DEFAULT_FETCH_SIZE = 10000;
private Config config;
@@ -205,7 +204,7 @@ public class JdbcSource implements FlinkBatchSource {
return getRowInfo(rs.getMetaData(), databaseDialect);
}
} catch (SQLException e) {
- LOGGER.warn("get row type info exception", e);
+ log.warn("get row type info exception", e);
}
return new LinkedHashMap<>();
}
@@ -248,7 +247,7 @@ public class JdbcSource implements FlinkBatchSource {
return new PostgresTypeInformationMap();
} else if (StringUtils.containsIgnoreCase(databaseDialect, "oracle")) {
return new OracleTypeInformationMap();
- } else if (StringUtils.containsIgnoreCase(databaseDialect, "Hive")){
+ } else if (StringUtils.containsIgnoreCase(databaseDialect, "Hive")) {
return new HiveTypeInformationMap();
} else {
return new DefaultTypeInformationMap();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index dbb60c7cf..f962fee4d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
@@ -42,17 +43,15 @@ import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
+@Slf4j
@AutoService(BaseFlinkSource.class)
public class KafkaTableStream implements FlinkStreamSource {
private static final long serialVersionUID = 5287018194573371428L;
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTableStream.class);
private Config config;
@@ -175,7 +174,7 @@ public class KafkaTableStream implements FlinkStreamSource {
try {
return SchemaUtil.setFormat(format, config);
} catch (Exception e) {
- LOGGER.warn("set format exception", e);
+ log.warn("set format exception", e);
}
throw new RuntimeException("format config error");
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
index e45872848..4c70e776d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
@@ -22,18 +22,17 @@ import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.exception.CommandException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class Seatunnel {
- private static final Logger LOGGER = LoggerFactory.getLogger(Seatunnel.class);
/**
* This method is the entrypoint of SeaTunnel.
*
* @param command commandArgs
- * @param <T> commandType
+ * @param <T> commandType
*/
public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
try {
@@ -48,26 +47,26 @@ public class Seatunnel {
}
private static void showConfigError(Throwable throwable) {
- LOGGER.error(
+ log.error(
"\n\n===============================================================================\n\n");
String errorMsg = throwable.getMessage();
- LOGGER.error("Config Error:\n");
- LOGGER.error("Reason: {} \n", errorMsg);
- LOGGER.error(
+ log.error("Config Error:\n");
+ log.error("Reason: {} \n", errorMsg);
+ log.error(
"\n===============================================================================\n\n\n");
}
private static void showFatalError(Throwable throwable) {
- LOGGER.error(
+ log.error(
"\n\n===============================================================================\n\n");
String errorMsg = throwable.getMessage();
- LOGGER.error("Fatal Error, \n");
+ log.error("Fatal Error, \n");
// FIX
- LOGGER.error(
+ log.error(
"Please submit bug report in https://github.com/apache/incubator-seatunnel/issues\n");
- LOGGER.error("Reason:{} \n", errorMsg);
- LOGGER.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
- LOGGER.error(
+ log.error("Reason:{} \n", errorMsg);
+ log.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
+ log.error(
"\n===============================================================================\n\n\n");
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
index b14b6fa96..8cc825b2b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
@@ -27,8 +27,7 @@ import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.utils.AsciiArtUtils;
import org.apache.seatunnel.core.base.utils.CompressionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.util.List;
@@ -39,10 +38,9 @@ import java.util.Objects;
*
* @param <T> command args.
*/
+@Slf4j
public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E extends RuntimeEnv> implements Command<T> {
- private static final Logger LOGGER = LoggerFactory.getLogger(BaseTaskExecuteCommand.class);
-
/**
* Check the plugin config.
*
@@ -117,7 +115,7 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
checkResult = CheckResult.error(e.getMessage());
}
if (!checkResult.isSuccess()) {
- LOGGER.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
+ log.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
System.exit(-1); // invalid configuration
}
}
@@ -128,11 +126,11 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
final DeployMode mode = Common.getDeployMode();
if (DeployMode.CLUSTER == mode) {
- LOGGER.info("preparing cluster mode work dir files...");
+ log.info("preparing cluster mode work dir files...");
File workDir = new File(".");
for (File file : Objects.requireNonNull(workDir.listFiles())) {
- LOGGER.warn("\t list file: {} ", file.getAbsolutePath());
+ log.warn("\t list file: {} ", file.getAbsolutePath());
}
// decompress plugin dir
File compressedFile = new File("plugins.tar.gz");
@@ -141,10 +139,10 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
File tempFile = CompressionUtils.unGzip(compressedFile, workDir);
CompressionUtils.unTar(tempFile, workDir);
} catch (Exception e) {
- LOGGER.error("failed to decompress plugins.tar.gz", e);
+ log.error("failed to decompress plugins.tar.gz", e);
System.exit(-1);
}
- LOGGER.info("succeeded to decompress plugins.tar.gz");
+ log.info("succeeded to decompress plugins.tar.gz");
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 5f80ae407..4a1d4485b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -24,19 +24,16 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
/**
* Used to build the {@link Config} from file.
- *
*/
+@Slf4j
public class ConfigBuilder {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
-
private final Path configFile;
private final Config config;
@@ -51,7 +48,7 @@ public class ConfigBuilder {
throw new ConfigRuntimeException("Please specify config file");
}
- LOGGER.info("Loading config file: {}", configFile);
+ log.info("Loading config file: {}", configFile);
// variables substitution / variables resolution order:
// config file --> system environment --> java properties
@@ -62,7 +59,7 @@ public class ConfigBuilder {
ConfigResolveOptions.defaults().setAllowUnresolved(true));
ConfigRenderOptions options = ConfigRenderOptions.concise().setFormatted(true);
- LOGGER.info("parsed config file: {}", config.root().render(options));
+ log.info("parsed config file: {}", config.root().render(options));
return config;
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
index 0c4fd178c..25c4f4d7d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
@@ -30,18 +30,16 @@ import org.apache.seatunnel.spark.batch.SparkBatchExecution;
import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
import org.apache.seatunnel.spark.structuredstream.StructuredStreamingExecution;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
/**
* Used to create {@link Execution}.
*
* @param <ENVIRONMENT> environment type
*/
+@Slf4j
public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionFactory.class);
-
public AbstractExecutionContext<ENVIRONMENT> executionContext;
public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT> executionContext) {
@@ -77,7 +75,7 @@ public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
default:
throw new IllegalArgumentException("No suitable engine");
}
- LOGGER.info("current execution is [{}]", execution.getClass().getName());
+ log.info("current execution is [{}]", execution.getClass().getName());
return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
index 326d5d377..fb9b75320 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.core.base.utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.awt.Font;
import java.awt.Graphics;
@@ -26,10 +25,9 @@ import java.awt.Graphics2D;
import java.awt.RenderingHints;
import java.awt.image.BufferedImage;
+@Slf4j
public final class AsciiArtUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(AsciiArtUtils.class);
-
private static final int FONT_SIZE = 24;
private static final int DRAW_X = 6;
private static final int RGB = -16777216;
@@ -66,7 +64,7 @@ public final class AsciiArtUtils {
if (sb.toString().trim().isEmpty()) {
continue;
}
- LOGGER.info(String.valueOf(sb));
+ log.info(String.valueOf(sb));
}
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
index 87673eb03..636c2c9cb 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.base.utils;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -24,8 +25,6 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.compress.utils.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -44,10 +43,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.zip.GZIPInputStream;
+@Slf4j
public final class CompressionUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(CompressionUtils.class);
-
private CompressionUtils() {
}
@@ -58,7 +56,7 @@ public final class CompressionUtils {
* @param outputFile the output tarball file.
*/
public static void tarGzip(final Path inputDir, final Path outputFile) throws IOException {
- LOGGER.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
+ log.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
try (OutputStream out = Files.newOutputStream(outputFile);
BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bufferedOut);
@@ -78,9 +76,9 @@ public final class CompressionUtils {
}
});
tarOut.finish();
- LOGGER.info("Creating tar file '{}'.", outputFile);
+ log.info("Creating tar file '{}'.", outputFile);
} catch (IOException e) {
- LOGGER.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
+ log.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
throw e;
}
}
@@ -99,7 +97,7 @@ public final class CompressionUtils {
*/
public static void unTar(final File inputFile, final File outputDir) throws IOException, ArchiveException {
- LOGGER.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+ log.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
final List<File> untaredFiles = new LinkedList<>();
try (final InputStream is = new FileInputStream(inputFile);
@@ -111,15 +109,15 @@ public final class CompressionUtils {
throw new IllegalStateException("Bad zip entry");
}
if (entry.isDirectory()) {
- LOGGER.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
+ log.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
if (!outputFile.exists()) {
- LOGGER.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
+ log.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
if (!outputFile.mkdirs()) {
throw new IllegalStateException(String.format("Couldn't create directory %s.", outputFile.getAbsolutePath()));
}
}
} else {
- LOGGER.info("Creating output file {}.", outputFile.getAbsolutePath());
+ log.info("Creating output file {}.", outputFile.getAbsolutePath());
File outputParentFile = outputFile.getParentFile();
if (outputParentFile != null && !outputParentFile.exists()) {
outputParentFile.mkdirs();
@@ -147,7 +145,7 @@ public final class CompressionUtils {
*/
public static File unGzip(final File inputFile, final File outputDir) throws IOException {
- LOGGER.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+ log.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
final File outputFile = new File(outputDir, inputFile.getName().substring(0, inputFile.getName().length() - 3));
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
index 5777805a7..59b570283 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.core.sql.classloader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.net.MalformedURLException;
import java.net.URL;
@@ -26,10 +25,9 @@ import java.net.URLClassLoader;
import java.nio.file.Path;
// TODO: maybe a unified plugin-style discovery mechanism is better.
+@Slf4j
public class CustomClassLoader extends URLClassLoader {
- private static final Logger LOGGER = LoggerFactory.getLogger(CustomClassLoader.class);
-
public CustomClassLoader() {
super(new URL[0]);
}
@@ -45,7 +43,7 @@ public class CustomClassLoader extends URLClassLoader {
try {
this.addURL(jarPath.toUri().toURL());
} catch (MalformedURLException e) {
- LOGGER.error("Failed to add jar to classloader. Jar: {}", jarPath, e);
+ log.error("Failed to add jar to classloader. Jar: {}", jarPath, e);
}
}
}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
index 044262ee8..7cc0db5a8 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.core.sql.classloader.CustomClassLoader;
import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.VisibleForTesting;
@@ -37,8 +38,6 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.MalformedURLException;
@@ -52,14 +51,13 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+@Slf4j
public class Executor {
- private static final Logger LOGGER = LoggerFactory.getLogger(Executor.class);
-
private static final String FLINK_SQL_SET_MATCHING_REGEX = "SET(\\s+(\\S+)\\s*=(.*))?";
private static final int FLINK_SQL_SET_OPERANDS = 3;
- private static CustomClassLoader CLASSLOADER = new CustomClassLoader();
+ private static final CustomClassLoader CLASSLOADER = new CustomClassLoader();
private static final String CONNECTOR_IDENTIFIER = "connector";
private static final String SQL_CONNECTOR_PREFIX = "flink-sql";
@@ -156,7 +154,7 @@ public class Executor {
.collect(Collectors.toList());
if (connectorFiles.size() > 1) {
- LOGGER.warn("Found more than one connector jars for {}. Only the first one will be loaded.", connectorType);
+ log.warn("Found more than one connector jars for {}. Only the first one will be loaded.", connectorType);
}
File connectorFile = connectorFiles.size() >= 1 ? connectorFiles.get(0) : null;
@@ -179,7 +177,7 @@ public class Executor {
configuration.set(PipelineOptions.JARS, jars);
configuration.set(PipelineOptions.CLASSPATHS, classpath);
} catch (MalformedURLException ignored) {
- LOGGER.error("Failed to load connector {}. Connector file: {}", connectorType, connectorFile.getAbsolutePath());
+ log.error("Failed to load connector {}. Connector file: {}", connectorType, connectorFile.getAbsolutePath());
}
}
}
@@ -199,7 +197,7 @@ public class Executor {
return Optional.empty();
}
- private static Optional<Pair<String, String>> operandConverter(String[] operands){
+ private static Optional<Pair<String, String>> operandConverter(String[] operands) {
if (operands.length != FLINK_SQL_SET_OPERANDS) {
return Optional.empty();
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
index f03522a04..253b69911 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkApiConfigChecker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
/**
* Used to check the Flink conf is validated.
*/
+@Slf4j
public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
-
private final FlinkCommandArgs flinkCommandArgs;
public FlinkApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
@@ -47,6 +45,6 @@ public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
ConfigBuilder configBuilder = new ConfigBuilder(configPath);
new FlinkApiConfigChecker().checkConfig(configBuilder.getConfig());
- LOGGER.info("config OK !");
+ log.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
index 0f4a93077..6aef6317d 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.config.SparkApiConfigChecker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
/**
* Used to validate the spark task conf is validated.
*/
+@Slf4j
public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkConfValidateCommand.class);
-
private final SparkCommandArgs sparkCommandArgs;
public SparkConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
@@ -47,6 +45,6 @@ public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
Path confPath = FileUtils.getConfigPath(sparkCommandArgs);
ConfigBuilder configBuilder = new ConfigBuilder(confPath);
new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
- LOGGER.info("config OK !");
+ log.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java
index 330aadb85..e4de62add 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java
@@ -22,18 +22,17 @@ import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.command.CommandArgs;
import org.apache.seatunnel.core.starter.exception.CommandException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class Seatunnel {
- private static final Logger LOGGER = LoggerFactory.getLogger(Seatunnel.class);
/**
* This method is the entrypoint of SeaTunnel.
*
* @param command commandArgs
- * @param <T> commandType
+ * @param <T> commandType
*/
public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
try {
@@ -48,26 +47,26 @@ public class Seatunnel {
}
private static void showConfigError(Throwable throwable) {
- LOGGER.error(
+ log.error(
"\n\n===============================================================================\n\n");
String errorMsg = throwable.getMessage();
- LOGGER.error("Config Error:\n");
- LOGGER.error("Reason: {} \n", errorMsg);
- LOGGER.error(
+ log.error("Config Error:\n");
+ log.error("Reason: {} \n", errorMsg);
+ log.error(
"\n===============================================================================\n\n\n");
}
private static void showFatalError(Throwable throwable) {
- LOGGER.error(
+ log.error(
"\n\n===============================================================================\n\n");
String errorMsg = throwable.getMessage();
- LOGGER.error("Fatal Error, \n");
+ log.error("Fatal Error, \n");
// FIX
- LOGGER.error(
+ log.error(
"Please submit bug report in https://github.com/apache/incubator-seatunnel/issues\n");
- LOGGER.error("Reason:{} \n", errorMsg);
- LOGGER.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
- LOGGER.error(
+ log.error("Reason:{} \n", errorMsg);
+ log.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
+ log.error(
"\n===============================================================================\n\n\n");
}
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java
index 968aa60a4..e96074296 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java
@@ -24,19 +24,16 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
/**
* Used to build the {@link Config} from file.
- *
*/
+@Slf4j
public class ConfigBuilder {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
-
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final Path configFile;
private final Config config;
@@ -52,7 +49,7 @@ public class ConfigBuilder {
throw new ConfigRuntimeException("Please specify config file");
}
- LOGGER.info("Loading config file: {}", configFile);
+ log.info("Loading config file: {}", configFile);
// variables substitution / variables resolution order:
// config file --> system environment --> java properties
@@ -63,7 +60,7 @@ public class ConfigBuilder {
ConfigResolveOptions.defaults().setAllowUnresolved(true));
ConfigRenderOptions options = ConfigRenderOptions.concise().setFormatted(true);
- LOGGER.info("parsed config file: {}", config.root().render(options));
+ log.info("parsed config file: {}", config.root().render(options));
return config;
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
index b0c47d2ac..24a029860 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
@@ -20,8 +20,7 @@ package org.apache.seatunnel.core.starter.config;
public enum EngineType {
SPARK("spark"),
FLINK("flink"),
- SEATUNNEL("seatunnel"),
- ;
+ SEATUNNEL("seatunnel");
private final String engine;
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java
index fa572f23e..3c38b74d4 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.core.starter.utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.awt.Font;
import java.awt.Graphics;
@@ -26,10 +25,9 @@ import java.awt.Graphics2D;
import java.awt.RenderingHints;
import java.awt.image.BufferedImage;
+@Slf4j
public final class AsciiArtUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(AsciiArtUtils.class);
-
private static final int FONT_SIZE = 24;
private static final int DRAW_X = 6;
private static final int RGB = -16777216;
@@ -66,7 +64,7 @@ public final class AsciiArtUtils {
if (sb.toString().trim().isEmpty()) {
continue;
}
- LOGGER.info(String.valueOf(sb));
+ log.info(String.valueOf(sb));
}
}
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java
index 0554e7091..66efe2923 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.utils;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -24,8 +25,6 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.compress.utils.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -44,10 +43,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.zip.GZIPInputStream;
+@Slf4j
public final class CompressionUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(CompressionUtils.class);
-
private CompressionUtils() {
}
@@ -58,7 +56,7 @@ public final class CompressionUtils {
* @param outputFile the output tarball file.
*/
public static void tarGzip(final Path inputDir, final Path outputFile) throws IOException {
- LOGGER.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
+ log.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
try (OutputStream out = Files.newOutputStream(outputFile);
BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bufferedOut);
@@ -78,9 +76,9 @@ public final class CompressionUtils {
}
});
tarOut.finish();
- LOGGER.info("Creating tar file '{}'.", outputFile);
+ log.info("Creating tar file '{}'.", outputFile);
} catch (IOException e) {
- LOGGER.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
+ log.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
throw e;
}
}
@@ -99,7 +97,7 @@ public final class CompressionUtils {
*/
public static void unTar(final File inputFile, final File outputDir) throws IOException, ArchiveException {
- LOGGER.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+ log.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
final List<File> untaredFiles = new LinkedList<>();
try (final InputStream is = new FileInputStream(inputFile);
@@ -111,15 +109,15 @@ public final class CompressionUtils {
throw new IllegalStateException("Bad zip entry");
}
if (entry.isDirectory()) {
- LOGGER.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
+ log.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
if (!outputFile.exists()) {
- LOGGER.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
+ log.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
if (!outputFile.mkdirs()) {
throw new IllegalStateException(String.format("Couldn't create directory %s.", outputFile.getAbsolutePath()));
}
}
} else {
- LOGGER.info("Creating output file {}.", outputFile.getAbsolutePath());
+ log.info("Creating output file {}.", outputFile.getAbsolutePath());
final OutputStream outputFileStream = new FileOutputStream(outputFile);
IOUtils.copy(debInputStream, outputFileStream);
outputFileStream.close();
@@ -143,7 +141,7 @@ public final class CompressionUtils {
*/
public static File unGzip(final File inputFile, final File outputDir) throws IOException {
- LOGGER.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+ log.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
final File outputFile = new File(outputDir, inputFile.getName().substring(0, inputFile.getName().length() - 3));
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java
index 5ca54a392..daedfaf34 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.config.FlinkApiConfigChecker;
import org.apache.seatunnel.core.starter.utils.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
/**
* Use to validate the configuration of the SeaTunnel API.
*/
+@Slf4j
public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
-
private final FlinkCommandArgs flinkCommandArgs;
public FlinkApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
@@ -48,6 +46,6 @@ public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
// todo: validate the config by new api
ConfigBuilder configBuilder = new ConfigBuilder(configPath);
new FlinkApiConfigChecker().checkConfig(configBuilder.getConfig());
- LOGGER.info("config OK !");
+ log.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
index 9fbbca174..19395db1a 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
@@ -26,8 +26,7 @@ import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
@@ -35,10 +34,9 @@ import java.nio.file.Path;
* todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
* This command is used to execute the Flink job by SeaTunnel new API.
*/
+@Slf4j
public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiTaskExecuteCommand.class);
-
private final FlinkCommandArgs flinkCommandArgs;
public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 51b36e7d2..4407a3e13 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -28,10 +28,9 @@ import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.net.URL;
@@ -43,9 +42,10 @@ import java.util.stream.Collectors;
/**
* Used to execute a SeaTunnelTask.
*/
+
+@Slf4j
public class FlinkExecution implements TaskExecution {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class);
private final FlinkEnvironment flinkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
@@ -68,7 +68,7 @@ public class FlinkExecution implements TaskExecution {
dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
sinkPluginExecuteProcessor.execute(dataStreams);
- LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+ log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
try {
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
} catch (Exception e) {
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
index 7b9d6782a..213a8d900 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
@@ -24,17 +24,16 @@ import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.spark.config.SparkApiConfigChecker;
import org.apache.seatunnel.core.starter.utils.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
/**
* Use to validate the configuration of the SeaTunnel API.
*/
-public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiConfValidateCommand.class);
+@Slf4j
+public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
private final SparkCommandArgs sparkCommandArgs;
@@ -48,6 +47,6 @@ public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
// todo: validate the config by new api
ConfigBuilder configBuilder = new ConfigBuilder(configPath);
new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
- LOGGER.info("config OK !");
+ log.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
index 2eb472937..5fdc4e28d 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
@@ -26,8 +26,7 @@ import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
@@ -35,10 +34,9 @@ import java.nio.file.Path;
* todo: do we need to move these class to a new module? since this may cause version conflict with the old Spark version.
* This command is used to execute the Spark job by SeaTunnel new API.
*/
+@Slf4j
public class SparkApiTaskExecuteCommand implements Command<SparkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiTaskExecuteCommand.class);
-
private final SparkCommandArgs sparkCommandArgs;
public SparkApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
@@ -53,7 +51,7 @@ public class SparkApiTaskExecuteCommand implements Command<SparkCommandArgs> {
SparkExecution seaTunnelTaskExecution = new SparkExecution(config);
seaTunnelTaskExecution.execute();
} catch (Exception e) {
- LOGGER.error("Run SeaTunnel on spark failed.", e);
+ log.error("Run SeaTunnel on spark failed.", e);
throw new CommandExecuteException(e.getMessage());
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 62890277c..ed2e60e91 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -25,17 +25,16 @@ import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+@Slf4j
public class SparkExecution {
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkExecution.class);
private final SparkEnvironment sparkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
@@ -56,6 +55,6 @@ public class SparkExecution {
datasets = transformPluginExecuteProcessor.execute(datasets);
sinkPluginExecuteProcessor.execute(datasets);
- LOGGER.info("Spark Execution started");
+ log.info("Spark Execution started");
}
}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
index 20bc93659..0a05d7c4d 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.core.starter.seatunnel.config.SeaTunnelApiConfigChecker;
import org.apache.seatunnel.core.starter.utils.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
/**
* Use to validate the configuration of the SeaTunnel API.
*/
+@Slf4j
public class ApiConfValidateCommand implements Command<ClientCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ApiConfValidateCommand.class);
-
private final ClientCommandArgs clientCommandArgs;
public ApiConfValidateCommand(ClientCommandArgs clientCommandArgs) {
@@ -47,6 +45,6 @@ public class ApiConfValidateCommand implements Command<ClientCommandArgs> {
Path configPath = FileUtils.getConfigPath(clientCommandArgs);
ConfigBuilder configBuilder = new ConfigBuilder(configPath);
new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
- LOGGER.info("config OK !");
+ log.info("config OK !");
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
index cf412da33..60746e25b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
@@ -26,13 +26,12 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
@@ -45,10 +44,10 @@ import javax.transaction.xa.Xid;
import java.util.stream.Stream;
+@Slf4j
@Disabled("Temporary fast fix, reason: JdbcDatabaseContainer: ClassNotFoundException: com.mysql.jdbc.Driver")
class XaGroupOpsImplIT {
- private static final Logger LOGGER = LoggerFactory.getLogger(XaGroupOpsImplIT.class);
private MySQLContainer<?> mc;
private XaGroupOps xaGroupOps;
private SemanticXidGenerator xidGenerator;
@@ -61,7 +60,7 @@ class XaGroupOpsImplIT {
// Non-root users need to grant XA_RECOVER_ADMIN permission
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
.withUsername("root")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(mc)).join();
jdbcConnectionOptions = JdbcConnectionOptions.builder()
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index fcd2815b2..03d2cca0c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -32,21 +32,19 @@ import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import com.google.common.collect.Lists;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+@Slf4j
@Disabled("Disabled because connector-v2 jar dist not exist")
public class JobExecutionIT {
- private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class);
-
@BeforeAll
public static void beforeClass() throws Exception {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index 792496a4e..12f4b2a06 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -45,8 +44,8 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+@Slf4j
public class FakeSourceToJdbcIT extends FlinkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
private PostgreSQLContainer<?> psl;
private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
@@ -56,9 +55,9 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(psl)).join();
- LOGGER.info("PostgreSql container started");
+ log.info("PostgreSql container started");
Class.forName(psl.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
index 351f0e93e..2d0a75e1e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
@@ -25,12 +25,11 @@ import org.apache.seatunnel.e2e.flink.FlinkContainer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
@@ -57,8 +56,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Slf4j
public class JdbcMysqlIT extends FlinkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMysqlIT.class);
private MySQLContainer<?> mc;
private Config config;
private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
@@ -71,9 +70,9 @@ public class JdbcMysqlIT extends FlinkContainer {
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withUsername("root")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(mc)).join();
- LOGGER.info("Mysql container started");
+ log.info("Mysql container started");
Class.forName(mc.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
index 8fa6e6e81..f946b4ee9 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
@@ -26,8 +26,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -52,7 +50,6 @@ import java.util.stream.Stream;
@Slf4j
public class JdbcPostgresIT extends FlinkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcPostgresIT.class);
private PostgreSQLContainer<?> pg;
private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
@@ -64,9 +61,9 @@ public class JdbcPostgresIT extends FlinkContainer {
.withNetworkAliases("postgresql")
.withCommand("postgres -c max_prepared_transactions=100")
.withUsername("root")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(pg)).join();
- LOGGER.info("Postgres container started");
+ log.info("Postgres container started");
Class.forName(pg.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index 3a42c1d3a..64e09c85d 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -21,12 +21,11 @@ import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -43,8 +42,8 @@ import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+@Slf4j
public class JdbcSourceToConsoleIT extends FlinkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
private PostgreSQLContainer<?> psl;
private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
@@ -54,9 +53,9 @@ public class JdbcSourceToConsoleIT extends FlinkContainer {
psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(psl)).join();
- LOGGER.info("PostgreSql container started");
+ log.info("PostgreSql container started");
Class.forName(psl.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
index 4790c2bb7..fbd87ee9c 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -44,23 +43,22 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+@Slf4j
public class FakeSourceToClickhouseIT extends FlinkContainer {
private GenericContainer<?> clickhouseServer;
private BalancedClickhouseDataSource dataSource;
private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:22.1.3.7";
- private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToClickhouseIT.class);
-
@BeforeEach
public void startClickhouseContainer() throws InterruptedException {
clickhouseServer = new GenericContainer<>(CLICKHOUSE_DOCKER_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases("clickhouse")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
clickhouseServer.setPortBindings(Lists.newArrayList("8123:8123"));
Startables.deepStart(Stream.of(clickhouseServer)).join();
- LOGGER.info("Clickhouse container started");
+ log.info("Clickhouse container started");
// wait for clickhouse fully start
dataSource = createDatasource();
given().ignoreExceptions()
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
index d861e8b4d..68a38aa4b 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
@@ -19,16 +19,14 @@ package org.apache.seatunnel.e2e.flink.file;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
+@Slf4j
public class FakeSourceToFileIT extends FlinkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToFileIT.class);
-
@Test
public void testFakeSource2FileSink() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_file.conf");
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
index 217731032..08eb08535 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
@@ -25,12 +25,11 @@ import org.apache.seatunnel.e2e.spark.SparkContainer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
@@ -57,8 +56,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Slf4j
public class JdbcMysqlIT extends SparkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMysqlIT.class);
private MySQLContainer<?> mc;
private Config config;
private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
@@ -71,9 +70,9 @@ public class JdbcMysqlIT extends SparkContainer {
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withUsername("root")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(mc)).join();
- LOGGER.info("Mysql container started");
+ log.info("Mysql container started");
Class.forName(mc.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
index bbc2bae7f..03770e8a5 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
@@ -21,12 +21,11 @@ import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.spark.SparkContainer;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -49,8 +48,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Slf4j
public class JdbcPostgresIT extends SparkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcPostgresIT.class);
private PostgreSQLContainer<?> pg;
private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
@@ -61,9 +60,9 @@ public class JdbcPostgresIT extends SparkContainer {
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
.withCommand("postgres -c max_prepared_transactions=100")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(pg)).join();
- LOGGER.info("Postgres container started");
+ log.info("Postgres container started");
Class.forName(pg.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
index c2d2a1426..9051c17e7 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.spark.SparkContainer;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -44,8 +43,8 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+@Slf4j
public class FakeSourceToJdbcIT extends SparkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
private PostgreSQLContainer<?> psl;
@SuppressWarnings("checkstyle:MagicNumber")
@@ -54,9 +53,9 @@ public class FakeSourceToJdbcIT extends SparkContainer {
psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
Startables.deepStart(Stream.of(psl)).join();
- LOGGER.info("PostgreSql container started");
+ log.info("PostgreSql container started");
Class.forName(psl.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
index cf9be5e9e..eb999dad8 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.spark.SparkContainer;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -43,8 +42,8 @@ import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+@Slf4j
public class JdbcSourceToConsoleIT extends SparkContainer {
- private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
private PostgreSQLContainer<?> psl;
@SuppressWarnings("checkstyle:MagicNumber")
@@ -53,10 +52,10 @@ public class JdbcSourceToConsoleIT extends SparkContainer {
psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withLogConsumer(new Slf4jLogConsumer(log));
psl.setPortBindings(Lists.newArrayList("33306:3306"));
Startables.deepStart(Stream.of(psl)).join();
- LOGGER.info("PostgreSql container started");
+ log.info("PostgreSql container started");
Class.forName(psl.getDriverClassName());
given().ignoreExceptions()
.await()
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
index e45e0b6cc..a4d243670 100644
--- a/seatunnel-engine/seatunnel-engine-common/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -40,7 +40,7 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
- <version>${version}</version>
+ <version>${project.version}</version>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 8fffecddd..c75e45dad 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -25,10 +25,9 @@ import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -46,9 +45,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+@Slf4j
public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPluginDiscovery.class);
private final Path pluginDir;
/**
@@ -69,12 +68,12 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
public AbstractPluginDiscovery(String pluginSubDir, BiConsumer<ClassLoader, URL> addURLToClassloader) {
this.pluginDir = Common.connectorJarDir(pluginSubDir);
this.addURLToClassLoader = addURLToClassloader;
- LOGGER.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
+ log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
}
public AbstractPluginDiscovery(String pluginSubDir) {
this.pluginDir = Common.connectorJarDir(pluginSubDir);
- LOGGER.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
+ log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
}
@Override
@@ -98,7 +97,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
- LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+ log.info("Load plugin: {} from classpath", pluginIdentifier);
return pluginInstance;
}
Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
@@ -108,13 +107,13 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
// use current thread classloader to avoid different classloader load same class error.
this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
} catch (Exception e) {
- LOGGER.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
+ log.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
" message: " + e.getMessage());
classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
}
pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
- LOGGER.info("Load plugin: {} from path: {} use classloader: {}",
+ log.info("Load plugin: {} from path: {} use classloader: {}",
pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
return pluginInstance;
}
@@ -201,10 +200,10 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
}
try {
URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
- LOGGER.info("Discovery plugin jar: {} at: {}", pluginIdentifier.getPluginName(), pluginJarPath);
+ log.info("Discovery plugin jar: {} at: {}", pluginIdentifier.getPluginName(), pluginJarPath);
return Optional.of(pluginJarPath);
} catch (MalformedURLException e) {
- LOGGER.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
+ log.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
return Optional.empty();
}
}
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
index 7c5d7f2e3..c55f9b8da 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
+++ b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
@@ -26,13 +26,12 @@ import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -40,9 +39,9 @@ import java.util.Map;
import java.util.Properties;
@SuppressWarnings("PMD")
+@Slf4j
public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
- private static final Logger LOGGER = LoggerFactory.getLogger(UDF.class);
private static final String UDF_CONFIG_PREFIX = "function.";
private Config config;
@@ -68,7 +67,7 @@ public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
try {
tEnv.createTemporarySystemFunction(functionNames.get(i), (Class<? extends UserDefinedFunction>) Class.forName(classNames.get(i)));
} catch (ClassNotFoundException e) {
- LOGGER.error("The udf class {} not founded, make sure you enter the correct class name", classNames.get(i));
+ log.error("The udf class {} not founded, make sure you enter the correct class name", classNames.get(i));
throw new RuntimeException(e);
}
}
@@ -109,9 +108,9 @@ public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
return "udf";
}
- private void hasSubConfig(String prefix){
+ private void hasSubConfig(String prefix) {
for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
- if (entry.getKey().startsWith(prefix)){
+ if (entry.getKey().startsWith(prefix)) {
return;
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
index 4cdfeb301..2f47df964 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
@@ -19,19 +19,17 @@ package org.apache.seatunnel.translation.flink.sink;
import org.apache.seatunnel.api.sink.SinkCommitter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink.Committer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+@Slf4j
public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkCommitter.class);
-
private final SinkCommitter<CommT> sinkCommitter;
FlinkCommitter(SinkCommitter<CommT> sinkCommitter) {
@@ -44,7 +42,7 @@ public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
.map(CommitWrapper::getCommit)
.collect(Collectors.toList()));
if (reCommittable != null && !reCommittable.isEmpty()) {
- LOGGER.warn("this version not support re-commit when use flink engine");
+ log.warn("this version not support re-commit when use flink engine");
}
// TODO re-commit the data
return new ArrayList<>();
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index f71634351..d9a060d7a 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -19,9 +19,8 @@ package org.apache.seatunnel.translation.flink.sink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -29,10 +28,9 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+@Slf4j
public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter<CommitWrapper<CommT>, GlobalCommT> {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkGlobalCommitter.class);
-
private final SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter;
FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
@@ -55,7 +53,7 @@ public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter
public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
List<GlobalCommT> reCommittable = aggregatedCommitter.commit(globalCommittables);
if (reCommittable != null && !reCommittable.isEmpty()) {
- LOGGER.warn("this version not support re-commit when use flink engine");
+ log.warn("this version not support re-commit when use flink engine");
}
// TODO re-commit the data
return new ArrayList<>();
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
index 92494d49e..81c491e74 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
@@ -26,9 +26,8 @@ import org.apache.seatunnel.translation.source.ParallelSource;
import org.apache.seatunnel.translation.spark.common.InternalRowCollector;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -36,10 +35,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+@Slf4j
public class ParallelBatchPartitionReader {
- private static final Logger LOGGER = LoggerFactory.getLogger(ParallelBatchPartitionReader.class);
-
protected static final Integer INTERVAL = 100;
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
@@ -97,7 +95,7 @@ public class ParallelBatchPartitionReader {
internalSource.run(new InternalRowCollector(handover, checkpointLock, source.getProducedType()));
} catch (Exception e) {
handover.reportError(e);
- LOGGER.error("BatchPartitionReader execute failed.", e);
+ log.error("BatchPartitionReader execute failed.", e);
running = false;
}
});