You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/12 11:21:13 UTC

[shardingsphere] branch master updated: Clean pipeline log (#22110)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 790e9d27806 Clean pipeline log (#22110)
790e9d27806 is described below

commit 790e9d278064b2a41c3d2d5ec610a9bd3c8ca3da
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Nov 12 19:21:04 2022 +0800

    Clean pipeline log (#22110)
    
    * Improve code style
    
    * Clean pipeline log
---
 .../distsql/handler/update/MigrateTableUpdater.java    |  1 -
 .../api/executor/AbstractLifecycleExecutor.java        |  1 -
 .../impl/AbstractInventoryIncrementalJobAPIImpl.java   |  1 -
 .../core/api/impl/AbstractPipelineJobAPIImpl.java      |  4 ----
 .../core/api/impl/GovernanceRepositoryAPIImpl.java     |  6 ------
 ...ractStreamingDataConsistencyCalculateAlgorithm.java |  9 ---------
 .../DataMatchDataConsistencyCalculateAlgorithm.java    |  2 +-
 .../check/datasource/AbstractDataSourceChecker.java    |  1 -
 .../data/pipeline/core/importer/DefaultImporter.java   | 18 ------------------
 .../pipeline/core/ingest/dumper/InventoryDumper.java   |  3 ---
 .../data/pipeline/core/job/AbstractPipelineJob.java    |  2 --
 .../data/pipeline/core/job/PipelineJobCenter.java      |  3 ---
 .../persist/PipelineJobProgressPersistService.java     |  3 ---
 .../core/metadata/generator/PipelineDDLGenerator.java  |  4 ++--
 .../loader/StandardPipelineTableMetaDataLoader.java    |  2 --
 .../pipeline/core/prepare/InventoryTaskSplitter.java   |  2 --
 .../core/prepare/PipelineJobPreparerUtils.java         | 14 ++------------
 .../prepare/datasource/AbstractDataSourcePreparer.java |  2 --
 .../data/pipeline/core/task/IncrementalTask.java       |  2 --
 .../core/task/InventoryIncrementalTasksRunner.java     |  5 -----
 .../data/pipeline/core/task/InventoryTask.java         |  2 --
 .../pipeline/core/util/PipelineDistributedBarrier.java |  2 --
 ...nsistencyCheckChangedJobConfigurationProcessor.java |  3 ---
 .../consistencycheck/ConsistencyCheckTasksRunner.java  |  4 ----
 .../MigrationChangedJobConfigurationProcessor.java     |  3 ---
 .../migration/MigrationDataConsistencyChecker.java     |  1 -
 .../scenario/migration/MigrationJobAPIImpl.java        | 11 ++---------
 .../scenario/migration/MigrationJobPreparer.java       |  5 -----
 .../datasource/MySQLJdbcQueryPropertiesExtension.java  |  4 +---
 .../pipeline/mysql/ingest/MySQLIncrementalDumper.java  | 14 +++++---------
 .../data/pipeline/mysql/ingest/client/MySQLClient.java |  4 +---
 .../check/datasource/PostgreSQLDataSourceChecker.java  |  2 +-
 32 files changed, 15 insertions(+), 125 deletions(-)

diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index a79ccbf9b71..07b8e3c8186 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -35,7 +35,6 @@ public final class MigrateTableUpdater implements RALUpdater<MigrateTableStateme
     
     @Override
     public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
-        log.info("start migrate job by {}", sqlStatement);
         String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName();
         Preconditions.checkNotNull(targetDatabaseName, "Target database name is null. You could define it in DistSQL or select a database.");
         JOB_API.createJobAndStart(new CreateMigrationJobParameter(
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index f779b23db14..89ad3b69224 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -47,7 +47,6 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
     
     @Override
     public void start() {
-        log.info("start lifecycle executor {}", super.toString());
         running = true;
         startTimeMillis = System.currentTimeMillis();
         runBlocking();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index f23f504e07f..525ba120cc3 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -160,7 +160,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
         InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
         if (null == jobItemProgress) {
-            log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
             return;
         }
         jobItemProgress.setStatus(status);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 0daae87d451..bd72448ba09 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -88,7 +88,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     public Optional<String> start(final PipelineJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
         ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
-        log.info("Start job by {}", jobConfig);
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
         String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
         if (repositoryAPI.isExisted(jobConfigKey)) {
@@ -119,7 +118,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     
     @Override
     public void startDisabledJob(final String jobId) {
-        log.info("Start disabled pipeline job {}", jobId);
         pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
@@ -135,7 +133,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     
     @Override
     public void stop(final String jobId) {
-        log.info("Stop pipeline job {}", jobId);
         pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         jobConfigPOJO.setDisabled(true);
@@ -148,7 +145,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     }
     
     protected void dropJob(final String jobId) {
-        log.info("Drop job {}", jobId);
         PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
         PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index b34774e04d2..0e27980841d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -72,7 +72,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     
     @Override
     public void persistCheckLatestJobId(final String jobId, final String checkJobId) {
-        log.info("persist check job id '{}' for job {}", checkJobId, jobId);
         repository.persist(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), String.valueOf(checkJobId));
     }
     
@@ -95,10 +94,8 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     @Override
     public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> checkResultMap) {
         if (null == checkResultMap) {
-            log.warn("checkResultMap is null, jobId {}, checkJobId {}", jobId, checkJobId);
             return;
         }
-        log.info("persist check job result for job {}", checkJobId);
         Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
         for (Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
             YamlDataConsistencyCheckResult yamlCheckResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
@@ -109,7 +106,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     
     @Override
     public void deleteCheckJobResult(final String jobId, final String checkJobId) {
-        log.info("deleteCheckJobResult, jobId={}, checkJobId={}", jobId, checkJobId);
         repository.delete(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
     }
     
@@ -120,7 +116,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     
     @Override
     public void deleteJob(final String jobId) {
-        log.info("delete job {}", jobId);
         repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
     }
     
@@ -142,7 +137,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     @Override
     public List<Integer> getShardingItems(final String jobId) {
         List<String> result = getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
-        log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
         return result.stream().map(Integer::parseInt).collect(Collectors.toList());
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index 2531d0ebf02..e0600b895bb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 
 import java.util.Iterator;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Streaming data consistency calculate algorithm.
@@ -67,8 +66,6 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm extends
         
         private final DataConsistencyCalculateParameter param;
         
-        private final AtomicInteger calculationCount = new AtomicInteger(0);
-        
         private volatile Optional<DataConsistencyCalculatedResult> nextResult;
         
         @Override
@@ -91,12 +88,6 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm extends
                 return;
             }
             nextResult = calculateChunk(param);
-            if (!nextResult.isPresent()) {
-                log.info("nextResult not present, calculation done. calculationCount={}", calculationCount);
-            }
-            if (0 == calculationCount.incrementAndGet() % 100_0000) {
-                log.warn("possible infinite loop, calculationCount={}", calculationCount);
-            }
         }
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 92492a3099f..971e28c304e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -196,7 +196,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                 return false;
             }
             EqualsBuilder equalsBuilder = new EqualsBuilder();
-            Iterator<Collection<Object>> thisIterator = this.records.iterator();
+            Iterator<Collection<Object>> thisIterator = records.iterator();
             Iterator<Collection<Object>> thatIterator = that.records.iterator();
             while (thisIterator.hasNext() && thatIterator.hasNext()) {
                 equalsBuilder.reset();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index a06c465aa99..0726c84007a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -66,7 +66,6 @@ public abstract class AbstractDataSourceChecker implements DataSourceChecker {
     
     private boolean checkEmpty(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
         String sql = getSQLBuilder().buildCheckEmptySQL(schemaName, tableName);
-        log.info("checkEmpty, sql={}", sql);
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(sql);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index f31b6e1142a..20d53b95f57 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -91,34 +91,18 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
     
     @Override
     protected void runBlocking() {
-        write();
-    }
-    
-    private void write() {
-        log.info("importer write");
-        int round = 1;
-        int rowCount = 0;
-        boolean finishedByBreak = false;
         int batchSize = importerConfig.getBatchSize() * 2;
         while (isRunning()) {
             List<Record> records = channel.fetchRecords(batchSize, 3);
             if (null != records && !records.isEmpty()) {
-                round++;
-                rowCount += records.size();
                 PipelineJobProgressUpdatedParameter updatedParam = flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
                 channel.ack(records);
                 jobProgressListener.onProgressUpdated(updatedParam);
-                if (0 == round % 50) {
-                    log.info("importer write, round={}, rowCount={}", round, rowCount);
-                }
                 if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
-                    log.info("write, get FinishedRecord, break");
-                    finishedByBreak = true;
                     break;
                 }
             }
         }
-        log.info("importer write done, rowCount={}, finishedByBreak={}", rowCount, finishedByBreak);
     }
     
     private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
@@ -264,10 +248,8 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
     
     @Override
     protected void doStop() throws SQLException {
-        final long startTimeMillis = System.currentTimeMillis();
         cancelStatement(batchInsertStatement);
         cancelStatement(updateStatement);
         cancelStatement(batchDeleteStatement);
-        log.info("doStop cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 30e719febbd..678be37b134 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -93,7 +93,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         String firstSQL = buildInventoryDumpSQL(true);
         String laterSQL = buildInventoryDumpSQL(false);
         IngestPosition<?> position = dumperConfig.getPosition();
-        log.info("Inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}.", dumperConfig.getUniqueKeyDataType(), firstSQL, laterSQL, position);
         if (position instanceof FinishedPosition) {
             log.info("Ignored because of already finished.");
             return;
@@ -106,7 +105,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
             while ((maxUniqueKeyValue = dump(tableMetaData, connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
                 beginUniqueKeyValue = maxUniqueKeyValue.get();
                 if (!isRunning()) {
-                    log.info("Broke because of inventory dump is not running.");
                     break;
                 }
             }
@@ -115,7 +113,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
             log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
             throw new IngestException(ex);
         } finally {
-            log.info("Inventory dump, before put FinishedRecord.");
             channel.pushRecord(new FinishedRecord(new FinishedPosition()));
         }
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 03ca0efd927..3aa0731dd7a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -67,9 +67,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     
     protected void prepare(final PipelineJobItemContext jobItemContext) {
         try {
-            long startTimeMillis = System.currentTimeMillis();
             doPrepare(jobItemContext);
-            log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index 22b8832a449..94d74fd0b83 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -43,7 +43,6 @@ public final class PipelineJobCenter {
      * @param job job
      */
     public static void addJob(final String jobId, final PipelineJob job) {
-        log.info("add job, jobId={}", jobId);
         JOB_MAP.put(jobId, job);
     }
     
@@ -65,11 +64,9 @@ public final class PipelineJobCenter {
     public static void stop(final String jobId) {
         PipelineJob job = JOB_MAP.get(jobId);
         if (null == job) {
-            log.info("job is null, ignore, jobId={}", jobId);
             return;
         }
         job.stop();
-        log.info("remove job, jobId={}", jobId);
         JOB_MAP.remove(jobId);
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index c57e946ccd0..263e71e5789 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -56,7 +56,6 @@ public final class PipelineJobProgressPersistService {
      * @param jobId job id
      */
     public static void removeJobProgressPersistContext(final String jobId) {
-        log.info("Remove job progress persist context, jobId={}", jobId);
         JOB_PROGRESS_PERSIST_MAP.remove(jobId);
     }
     
@@ -67,7 +66,6 @@ public final class PipelineJobProgressPersistService {
      * @param shardingItem sharding item
      */
     public static void addJobProgressPersistContext(final String jobId, final int shardingItem) {
-        log.info("Add job progress persist context, jobId={}, shardingItem={}", jobId, shardingItem);
         JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
     }
     
@@ -81,7 +79,6 @@ public final class PipelineJobProgressPersistService {
         Map<Integer, PipelineJobProgressPersistContext> persistContextMap = JOB_PROGRESS_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
         PipelineJobProgressPersistContext persistContext = persistContextMap.get(shardingItem);
         if (null == persistContext) {
-            log.debug("persistContext is null, jobId={}, shardingItem={}", jobId, shardingItem);
             return;
         }
         persistContext.getHasNewEvents().set(true);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index a2785126509..45f2b98efe5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -72,14 +72,14 @@ public final class PipelineDDLGenerator {
      */
     public String generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
                                    final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
-        log.info("generateLogicDDL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
         long startTimeMillis = System.currentTimeMillis();
         StringBuilder result = new StringBuilder();
         for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
             Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
             queryContext.ifPresent(ddlSQL -> result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
         }
-        log.info("generateLogicDDL cost {} ms", System.currentTimeMillis() - startTimeMillis);
+        log.info("generateLogicDDL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}, cost {} ms",
+                databaseType.getType(), schemaName, sourceTableName, targetTableName, System.currentTimeMillis() - startTimeMillis);
         return result.toString();
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 53c3dad8223..851fb190821 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -79,8 +79,6 @@ public final class StandardPipelineTableMetaDataLoader implements PipelineTableM
             long startMillis = System.currentTimeMillis();
             String schemaNameFinal = isSchemaAvailable() ? schemaName : null;
             Map<TableName, PipelineTableMetaData> tableMetaDataMap = loadTableMetaData0(connection, schemaNameFinal, tableNamePattern);
-            log.info("loadTableMetaData, schemaNameFinal={}, tableNamePattern={}, result={}, cost time={} ms",
-                    schemaNameFinal, tableNamePattern, tableMetaDataMap, System.currentTimeMillis() - startMillis);
             this.tableMetaDataMap.putAll(tableMetaDataMap);
         }
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index ff60d0765d0..a4e9fc2a66b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -172,13 +172,11 @@ public final class InventoryTaskSplitter {
                 preparedStatement.setLong(2, shardingSize);
                 try (ResultSet resultSet = preparedStatement.executeQuery()) {
                     if (!resultSet.next()) {
-                        log.info("getPositionByPrimaryKeyRange, resultSet's next return false, break");
                         break;
                     }
                     long endId = resultSet.getLong(1);
                     recordsCount += resultSet.getLong(2);
                     if (0 == endId) {
-                        log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), beginId);
                         break;
                     }
                     result.add(new IntegerPrimaryKeyPosition(beginId, endId));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index e1284c59298..d62c70f4fc2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -80,9 +80,7 @@ public final class PipelineJobPreparerUtils {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        long startTimeMillis = System.currentTimeMillis();
         dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParam);
-        log.info("prepareTargetSchema cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     /**
@@ -134,10 +132,7 @@ public final class PipelineJobPreparerUtils {
         }
         String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
         DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        long startTimeMillis = System.currentTimeMillis();
-        IngestPosition<?> result = PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
-        log.info("getIncrementalPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
-        return result;
+        return PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
     }
     
     /**
@@ -147,16 +142,13 @@ public final class PipelineJobPreparerUtils {
      * @param dataSources data source
      */
     public static void checkSourceDataSource(final String databaseType, final Collection<? extends DataSource> dataSources) {
-        if (null == dataSources || dataSources.isEmpty()) {
-            log.info("source data source is empty, skip check");
+        if (dataSources.isEmpty()) {
             return;
         }
-        final long startTimeMillis = System.currentTimeMillis();
         DataSourceChecker dataSourceChecker = DataSourceCheckerFactory.getInstance(databaseType);
         dataSourceChecker.checkConnection(dataSources);
         dataSourceChecker.checkPrivilege(dataSources);
         dataSourceChecker.checkVariable(dataSources);
-        log.info("checkSourceDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     /**
@@ -172,10 +164,8 @@ public final class PipelineJobPreparerUtils {
             log.info("target data source is empty, skip check");
             return;
         }
-        long startTimeMillis = System.currentTimeMillis();
         dataSourceChecker.checkConnection(targetDataSources);
         dataSourceChecker.checkTargetTable(targetDataSources, importerConfig.getTableNameSchemaNameMapping(), importerConfig.getLogicTableNames());
-        log.info("checkTargetDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     /**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 1fde15978c8..54c8f9013fc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -56,7 +56,6 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) {
         DatabaseType targetDatabaseType = param.getTargetDatabaseType();
         if (!targetDatabaseType.isSchemaAvailable()) {
-            log.info("prepareTargetSchemas, target database does not support schema, ignore, targetDatabaseType={}", targetDatabaseType);
             return;
         }
         CreateTableConfiguration createTableConfig = param.getCreateTableConfig();
@@ -74,7 +73,6 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
                 createdSchemaNames.add(targetSchemaName);
             }
         }
-        log.info("prepareTargetSchemas, createdSchemaNames={}, defaultSchema={}", createdSchemaNames, defaultSchema);
     }
     
     private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9f963e4fd64..89518f60a2f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -120,7 +120,6 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
             
             @Override
             public void onSuccess() {
-                log.info("incremental dumper onSuccess, taskId={}", taskId);
             }
             
             @Override
@@ -134,7 +133,6 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
             
             @Override
             public void onSuccess() {
-                log.info("importer onSuccess, taskId={}", taskId);
             }
             
             @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 32f653efa52..875f9faac39 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -59,7 +59,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
     @Override
     public void stop() {
         jobItemContext.setStopping(true);
-        log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(), jobItemContext.getShardingItem());
         for (InventoryTask each : inventoryTasks) {
             each.stop();
             each.close();
@@ -73,13 +72,11 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
     @Override
     public void start() {
         if (jobItemContext.isStopping()) {
-            log.info("job stopping, ignore inventory task");
             return;
         }
         PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
         if (executeInventoryTask()) {
             if (jobItemContext.isStopping()) {
-                log.info("stopping, ignore incremental task");
                 return;
             }
             executeIncrementalTask();
@@ -91,7 +88,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             log.info("All inventory tasks finished.");
             return true;
         }
-        log.info("-------------- Start inventory task --------------");
         updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (InventoryTask each : inventoryTasks) {
@@ -136,7 +132,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
             return;
         }
-        log.info("-------------- Start incremental task --------------");
         updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (IncrementalTask each : incrementalTasks) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 0d31db298eb..5795d74a8cc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -92,7 +92,6 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
             
             @Override
             public void onSuccess() {
-                log.info("dumper onSuccess, taskId={}", taskId);
             }
             
             @Override
@@ -106,7 +105,6 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
             
             @Override
             public void onSuccess() {
-                log.info("importer onSuccess, taskId={}", taskId);
             }
             
             @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 7793b9914fb..bf0553f9e69 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -85,7 +85,6 @@ public final class PipelineDistributedBarrier {
      */
     public void persistEphemeralChildrenNode(final String barrierPath, final int shardingItem) {
         if (!getRepository().isExisted(barrierPath)) {
-            log.info("barrier path {} not exist, ignore", barrierPath);
             return;
         }
         String key = String.join("/", barrierPath, Integer.toString(shardingItem));
@@ -145,7 +144,6 @@ public final class PipelineDistributedBarrier {
             return;
         }
         List<String> childrenKeys = getRepository().getChildrenKeys(barrierPath);
-        log.info("children keys: {}, total count: {}", childrenKeys, holder.getTotalCount());
         if (childrenKeys.size() == holder.getTotalCount()) {
             holder.getCountDownLatch().countDown();
         }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index fe235973243..462b1582119 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -44,7 +44,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
     public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
-            log.info("{} is disabled", jobId);
             Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
             for (Integer each : shardingItems) {
@@ -58,7 +57,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
                 if (PipelineJobCenter.isJobExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
-                    log.info("{} executing jobs", jobId);
                     CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
                         if (null != throwable) {
                             log.error("execute failed, jobId={}", jobId, throwable);
@@ -67,7 +65,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
                 }
                 break;
             case DELETED:
-                log.info("deleted consistency check job id: {}", jobId);
                 PipelineJobCenter.stop(jobId);
                 break;
             default:
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index c52622693d2..ccf9c8583f0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -75,7 +75,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
     @Override
     public void start() {
         if (jobItemContext.isStopping()) {
-            log.info("job stopping, ignore consistency check");
             return;
         }
         PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
@@ -86,7 +85,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
     @Override
     public void stop() {
         jobItemContext.setStopping(true);
-        log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(), jobItemContext.getShardingItem());
         checkExecutor.stop();
     }
     
@@ -94,7 +92,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
         
         @Override
         protected void runBlocking() {
-            log.info("execute consistency check, check job id: {}, parent job id: {}", checkJobId, parentJobId);
             checkJobAPI.persistJobItemProgress(jobItemContext);
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
             InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) PipelineAPIFactory.getPipelineJobAPI(jobType);
@@ -110,7 +107,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
         @Override
         protected void doStop() throws SQLException {
             DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
-            log.info("doStop, algorithm={}", algorithm);
             if (null != algorithm) {
                 algorithm.cancel();
             }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 6cb937a2970..58098ad7ed8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -45,7 +45,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
     public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
-            log.info("{} is disabled", jobId);
             Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
             for (Integer each : shardingItems) {
@@ -59,7 +58,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                 if (PipelineJobCenter.isJobExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
-                    log.info("{} executing jobs", jobId);
                     CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
                         if (null != throwable) {
                             log.error("execute failed, jobId={}", jobId, throwable);
@@ -68,7 +66,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                 }
                 break;
             case DELETED:
-                log.info("deleted jobId={}", jobId);
                 new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()));
                 PipelineJobCenter.stop(jobId);
                 break;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index a1224ac2b67..2894967e8fc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -86,7 +86,6 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
             long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
             checkJobItemContext.setRecordsCount(recordsCount);
             checkJobItemContext.getTableNames().add(jobConfig.getSourceTableName());
-            log.info("consistency check, get records count: {}", recordsCount);
             PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
             SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
                     sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index d1f88d38014..b7bd0fdbef4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -192,9 +192,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         Map<LogicTableName, Set<String>> shardingColumnsMap = ShardingColumnsExtractorFactory.getInstance().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), Collections.singleton(new LogicTableName(jobConfig.getTargetTableName())));
         ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, tableNameSchemaNameMapping);
-        MigrationTaskConfiguration result = new MigrationTaskConfiguration(jobConfig.getSourceResourceName(), createTableConfig, dumperConfig, importerConfig);
-        log.info("buildTaskConfiguration, sourceResourceName={}, result={}", jobConfig.getSourceResourceName(), result);
-        return result;
+        return new MigrationTaskConfiguration(jobConfig.getSourceResourceName(), createTableConfig, dumperConfig, importerConfig);
     }
     
     private CreateTableConfiguration buildCreateTableConfiguration(final MigrationJobConfiguration jobConfig) {
@@ -278,13 +276,12 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     
     @Override
     public void rollback(final String jobId) throws SQLException {
-        log.info("Rollback job {}", jobId);
         final long startTimeMillis = System.currentTimeMillis();
         dropCheckJobs(jobId);
         stop(jobId);
         cleanTempTableOnRollback(jobId);
         dropJob(jobId);
-        log.info("Rollback cost {} ms", System.currentTimeMillis() - startTimeMillis);
+        log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
     }
     
     private void dropCheckJobs(final String jobId) {
@@ -292,8 +289,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         if (checkJobIds.isEmpty()) {
             return;
         }
-        log.info("dropCheckJobs start...");
-        long startTimeMillis = System.currentTimeMillis();
         for (String each : checkJobIds) {
             try {
                 dropJob(each);
@@ -303,7 +298,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
                 log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
             }
         }
-        log.info("dropCheckJobs cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws SQLException {
@@ -336,7 +330,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     
     @Override
     public void addMigrationSourceResources(final Map<String, DataSourceProperties> dataSourcePropsMap) {
-        log.info("Add migration source resources {}", dataSourcePropsMap.keySet());
         Map<String, DataSourceProperties> existDataSources = dataSourcePersistService.load(getJobType());
         Collection<String> duplicateDataSourceNames = new HashSet<>(dataSourcePropsMap.size(), 1);
         for (Entry<String, DataSourceProperties> entry : dataSourcePropsMap.entrySet()) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 122a20d5b77..aa715f0c95a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -64,20 +64,17 @@ public final class MigrationJobPreparer {
     public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException {
         PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
-            log.info("prepare, job is stopping, jobId={}", jobItemContext.getJobId());
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
         prepareAndCheckTargetWithLock(jobItemContext);
         if (jobItemContext.isStopping()) {
-            log.info("prepare, job is stopping, jobId={}", jobItemContext.getJobId());
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
         if (PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType())) {
             initIncrementalTasks(jobItemContext);
             if (jobItemContext.isStopping()) {
-                log.info("prepare, job is stopping, jobId={}", jobItemContext.getJobId());
                 PipelineJobCenter.stop(jobItemContext.getJobId());
                 return;
             }
@@ -103,7 +100,6 @@ public final class MigrationJobPreparer {
                 boolean prepareFlag = JobStatus.PREPARING.equals(jobItemProgress.getStatus()) || JobStatus.RUNNING.equals(jobItemProgress.getStatus())
                         || JobStatus.PREPARING_FAILURE.equals(jobItemProgress.getStatus());
                 if (prepareFlag) {
-                    log.info("execute prepare, jobId={}, shardingItem={}, jobStatus={}", jobConfig.getJobId(), jobItemContext.getShardingItem(), jobItemProgress.getStatus());
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     JOB_API.updateJobItemStatus(jobConfig.getJobId(), jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
@@ -121,7 +117,6 @@ public final class MigrationJobPreparer {
     
     private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
         if (jobItemContext.isSourceTargetDatabaseTheSame()) {
-            log.info("prepare target ...");
             prepareTarget(jobItemContext);
         }
         InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
index 7445a132a37..b202edf176a 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
@@ -54,9 +54,7 @@ public final class MySQLJdbcQueryPropertiesExtension implements JdbcQueryPropert
     private static String initMysqlConnectorVersion() {
         try {
             Class<?> mysqlDriverClass = MySQLJdbcQueryPropertiesExtension.class.getClassLoader().loadClass("com.mysql.jdbc.Driver");
-            String result = mysqlDriverClass.getPackage().getImplementationVersion();
-            log.info("mysql connector version {}", result);
-            return result;
+            return mysqlDriverClass.getPackage().getImplementationVersion();
         } catch (final ClassNotFoundException ex) {
             log.warn("not find com.mysql.jdbc.Driver class");
             return null;
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 2e5f432d33d..5a4d53ffdeb 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -91,39 +91,35 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
     protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
-        int eventCount = 0;
         while (isRunning()) {
             AbstractBinlogEvent event = client.poll();
             if (null == event) {
                 continue;
             }
-            eventCount += handleEvent(event);
+            handleEvent(event);
         }
-        log.info("incremental dump, eventCount={}", eventCount);
         channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
     }
     
-    private int handleEvent(final AbstractBinlogEvent event) {
+    private void handleEvent(final AbstractBinlogEvent event) {
         if (event instanceof PlaceholderEvent || !((AbstractRowsEvent) event).getDatabaseName().equals(catalog) || !dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
             createPlaceholderRecord(event);
-            return 0;
+            return;
         }
         if (event instanceof WriteRowsEvent) {
             PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
             handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
-            return 1;
+            return;
         }
         if (event instanceof UpdateRowsEvent) {
             PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
             handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
-            return 1;
+            return;
         }
         if (event instanceof DeleteRowsEvent) {
             PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
             handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
-            return 1;
         }
-        return 0;
     }
     
     private void createPlaceholderRecord(final AbstractBinlogEvent event) {
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 3df3d068be7..6ab8369812b 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -283,7 +283,6 @@ public final class MySQLClient {
         
         @Override
         public void channelInactive(final ChannelHandlerContext ctx) {
-            log.warn("channel inactive");
             if (!running) {
                 return;
             }
@@ -305,12 +304,11 @@ public final class MySQLClient {
                 running = false;
                 return;
             }
-            int retryTimes = reconnectTimes.incrementAndGet();
+            reconnectTimes.incrementAndGet();
             if (null == lastBinlogEvent || null == lastBinlogEvent.getFileName()) {
                 log.warn("last binlog event is null or the file name is null, last binlog event:{}", lastBinlogEvent);
                 return;
             }
-            log.info("reconnect MySQL client, retry times={}", retryTimes);
             closeChannel();
             connect();
             subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
index abfad6e74bf..d31d48c8a90 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
@@ -37,7 +37,7 @@ import java.util.Collections;
  * PostgreSQL Data source checker.
  */
 @Slf4j
-public class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker {
+public final class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker {
     
     private static final String SHOW_GRANTS_SQL = "SELECT * FROM pg_roles WHERE rolname = ?";