You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/12 11:21:13 UTC
[shardingsphere] branch master updated: Clean pipeline log (#22110)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 790e9d27806 Clean pipeline log (#22110)
790e9d27806 is described below
commit 790e9d278064b2a41c3d2d5ec610a9bd3c8ca3da
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Nov 12 19:21:04 2022 +0800
Clean pipeline log (#22110)
* Improve code style
* Clean pipeline log
---
.../distsql/handler/update/MigrateTableUpdater.java | 1 -
.../api/executor/AbstractLifecycleExecutor.java | 1 -
.../impl/AbstractInventoryIncrementalJobAPIImpl.java | 1 -
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 4 ----
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 6 ------
...ractStreamingDataConsistencyCalculateAlgorithm.java | 9 ---------
.../DataMatchDataConsistencyCalculateAlgorithm.java | 2 +-
.../check/datasource/AbstractDataSourceChecker.java | 1 -
.../data/pipeline/core/importer/DefaultImporter.java | 18 ------------------
.../pipeline/core/ingest/dumper/InventoryDumper.java | 3 ---
.../data/pipeline/core/job/AbstractPipelineJob.java | 2 --
.../data/pipeline/core/job/PipelineJobCenter.java | 3 ---
.../persist/PipelineJobProgressPersistService.java | 3 ---
.../core/metadata/generator/PipelineDDLGenerator.java | 4 ++--
.../loader/StandardPipelineTableMetaDataLoader.java | 2 --
.../pipeline/core/prepare/InventoryTaskSplitter.java | 2 --
.../core/prepare/PipelineJobPreparerUtils.java | 14 ++------------
.../prepare/datasource/AbstractDataSourcePreparer.java | 2 --
.../data/pipeline/core/task/IncrementalTask.java | 2 --
.../core/task/InventoryIncrementalTasksRunner.java | 5 -----
.../data/pipeline/core/task/InventoryTask.java | 2 --
.../pipeline/core/util/PipelineDistributedBarrier.java | 2 --
...nsistencyCheckChangedJobConfigurationProcessor.java | 3 ---
.../consistencycheck/ConsistencyCheckTasksRunner.java | 4 ----
.../MigrationChangedJobConfigurationProcessor.java | 3 ---
.../migration/MigrationDataConsistencyChecker.java | 1 -
.../scenario/migration/MigrationJobAPIImpl.java | 11 ++---------
.../scenario/migration/MigrationJobPreparer.java | 5 -----
.../datasource/MySQLJdbcQueryPropertiesExtension.java | 4 +---
.../pipeline/mysql/ingest/MySQLIncrementalDumper.java | 14 +++++---------
.../data/pipeline/mysql/ingest/client/MySQLClient.java | 4 +---
.../check/datasource/PostgreSQLDataSourceChecker.java | 2 +-
32 files changed, 15 insertions(+), 125 deletions(-)
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index a79ccbf9b71..07b8e3c8186 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -35,7 +35,6 @@ public final class MigrateTableUpdater implements RALUpdater<MigrateTableStateme
@Override
public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
- log.info("start migrate job by {}", sqlStatement);
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName();
Preconditions.checkNotNull(targetDatabaseName, "Target database name is null. You could define it in DistSQL or select a database.");
JOB_API.createJobAndStart(new CreateMigrationJobParameter(
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index f779b23db14..89ad3b69224 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -47,7 +47,6 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
@Override
public void start() {
- log.info("start lifecycle executor {}", super.toString());
running = true;
startTimeMillis = System.currentTimeMillis();
runBlocking();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index f23f504e07f..525ba120cc3 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -160,7 +160,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
if (null == jobItemProgress) {
- log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
jobItemProgress.setStatus(status);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 0daae87d451..bd72448ba09 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -88,7 +88,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
- log.info("Start job by {}", jobConfig);
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
if (repositoryAPI.isExisted(jobConfigKey)) {
@@ -119,7 +118,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
@Override
public void startDisabledJob(final String jobId) {
- log.info("Start disabled pipeline job {}", jobId);
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
@@ -135,7 +133,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
@Override
public void stop(final String jobId) {
- log.info("Stop pipeline job {}", jobId);
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
jobConfigPOJO.setDisabled(true);
@@ -148,7 +145,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
}
protected void dropJob(final String jobId) {
- log.info("Drop job {}", jobId);
PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index b34774e04d2..0e27980841d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -72,7 +72,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public void persistCheckLatestJobId(final String jobId, final String checkJobId) {
- log.info("persist check job id '{}' for job {}", checkJobId, jobId);
repository.persist(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), String.valueOf(checkJobId));
}
@@ -95,10 +94,8 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> checkResultMap) {
if (null == checkResultMap) {
- log.warn("checkResultMap is null, jobId {}, checkJobId {}", jobId, checkJobId);
return;
}
- log.info("persist check job result for job {}", checkJobId);
Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
for (Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
YamlDataConsistencyCheckResult yamlCheckResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
@@ -109,7 +106,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public void deleteCheckJobResult(final String jobId, final String checkJobId) {
- log.info("deleteCheckJobResult, jobId={}, checkJobId={}", jobId, checkJobId);
repository.delete(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
}
@@ -120,7 +116,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public void deleteJob(final String jobId) {
- log.info("delete job {}", jobId);
repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
}
@@ -142,7 +137,6 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public List<Integer> getShardingItems(final String jobId) {
List<String> result = getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
- log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
return result.stream().map(Integer::parseInt).collect(Collectors.toList());
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index 2531d0ebf02..e0600b895bb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import java.util.Iterator;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Streaming data consistency calculate algorithm.
@@ -67,8 +66,6 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm extends
private final DataConsistencyCalculateParameter param;
- private final AtomicInteger calculationCount = new AtomicInteger(0);
-
private volatile Optional<DataConsistencyCalculatedResult> nextResult;
@Override
@@ -91,12 +88,6 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm extends
return;
}
nextResult = calculateChunk(param);
- if (!nextResult.isPresent()) {
- log.info("nextResult not present, calculation done. calculationCount={}", calculationCount);
- }
- if (0 == calculationCount.incrementAndGet() % 100_0000) {
- log.warn("possible infinite loop, calculationCount={}", calculationCount);
- }
}
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 92492a3099f..971e28c304e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -196,7 +196,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
return false;
}
EqualsBuilder equalsBuilder = new EqualsBuilder();
- Iterator<Collection<Object>> thisIterator = this.records.iterator();
+ Iterator<Collection<Object>> thisIterator = records.iterator();
Iterator<Collection<Object>> thatIterator = that.records.iterator();
while (thisIterator.hasNext() && thatIterator.hasNext()) {
equalsBuilder.reset();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index a06c465aa99..0726c84007a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -66,7 +66,6 @@ public abstract class AbstractDataSourceChecker implements DataSourceChecker {
private boolean checkEmpty(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
String sql = getSQLBuilder().buildCheckEmptySQL(schemaName, tableName);
- log.info("checkEmpty, sql={}", sql);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index f31b6e1142a..20d53b95f57 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -91,34 +91,18 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
@Override
protected void runBlocking() {
- write();
- }
-
- private void write() {
- log.info("importer write");
- int round = 1;
- int rowCount = 0;
- boolean finishedByBreak = false;
int batchSize = importerConfig.getBatchSize() * 2;
while (isRunning()) {
List<Record> records = channel.fetchRecords(batchSize, 3);
if (null != records && !records.isEmpty()) {
- round++;
- rowCount += records.size();
PipelineJobProgressUpdatedParameter updatedParam = flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
channel.ack(records);
jobProgressListener.onProgressUpdated(updatedParam);
- if (0 == round % 50) {
- log.info("importer write, round={}, rowCount={}", round, rowCount);
- }
if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
- log.info("write, get FinishedRecord, break");
- finishedByBreak = true;
break;
}
}
}
- log.info("importer write done, rowCount={}, finishedByBreak={}", rowCount, finishedByBreak);
}
private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
@@ -264,10 +248,8 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
@Override
protected void doStop() throws SQLException {
- final long startTimeMillis = System.currentTimeMillis();
cancelStatement(batchInsertStatement);
cancelStatement(updateStatement);
cancelStatement(batchDeleteStatement);
- log.info("doStop cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 30e719febbd..678be37b134 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -93,7 +93,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
String firstSQL = buildInventoryDumpSQL(true);
String laterSQL = buildInventoryDumpSQL(false);
IngestPosition<?> position = dumperConfig.getPosition();
- log.info("Inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}.", dumperConfig.getUniqueKeyDataType(), firstSQL, laterSQL, position);
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
@@ -106,7 +105,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
while ((maxUniqueKeyValue = dump(tableMetaData, connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
beginUniqueKeyValue = maxUniqueKeyValue.get();
if (!isRunning()) {
- log.info("Broke because of inventory dump is not running.");
break;
}
}
@@ -115,7 +113,6 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
throw new IngestException(ex);
} finally {
- log.info("Inventory dump, before put FinishedRecord.");
channel.pushRecord(new FinishedRecord(new FinishedPosition()));
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 03ca0efd927..3aa0731dd7a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -67,9 +67,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
protected void prepare(final PipelineJobItemContext jobItemContext) {
try {
- long startTimeMillis = System.currentTimeMillis();
doPrepare(jobItemContext);
- log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index 22b8832a449..94d74fd0b83 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -43,7 +43,6 @@ public final class PipelineJobCenter {
* @param job job
*/
public static void addJob(final String jobId, final PipelineJob job) {
- log.info("add job, jobId={}", jobId);
JOB_MAP.put(jobId, job);
}
@@ -65,11 +64,9 @@ public final class PipelineJobCenter {
public static void stop(final String jobId) {
PipelineJob job = JOB_MAP.get(jobId);
if (null == job) {
- log.info("job is null, ignore, jobId={}", jobId);
return;
}
job.stop();
- log.info("remove job, jobId={}", jobId);
JOB_MAP.remove(jobId);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index c57e946ccd0..263e71e5789 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -56,7 +56,6 @@ public final class PipelineJobProgressPersistService {
* @param jobId job id
*/
public static void removeJobProgressPersistContext(final String jobId) {
- log.info("Remove job progress persist context, jobId={}", jobId);
JOB_PROGRESS_PERSIST_MAP.remove(jobId);
}
@@ -67,7 +66,6 @@ public final class PipelineJobProgressPersistService {
* @param shardingItem sharding item
*/
public static void addJobProgressPersistContext(final String jobId, final int shardingItem) {
- log.info("Add job progress persist context, jobId={}, shardingItem={}", jobId, shardingItem);
JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
}
@@ -81,7 +79,6 @@ public final class PipelineJobProgressPersistService {
Map<Integer, PipelineJobProgressPersistContext> persistContextMap = JOB_PROGRESS_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
PipelineJobProgressPersistContext persistContext = persistContextMap.get(shardingItem);
if (null == persistContext) {
- log.debug("persistContext is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
persistContext.getHasNewEvents().set(true);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index a2785126509..45f2b98efe5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -72,14 +72,14 @@ public final class PipelineDDLGenerator {
*/
public String generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
- log.info("generateLogicDDL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
long startTimeMillis = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(ddlSQL -> result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
- log.info("generateLogicDDL cost {} ms", System.currentTimeMillis() - startTimeMillis);
+ log.info("generateLogicDDL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}, cost {} ms",
+ databaseType.getType(), schemaName, sourceTableName, targetTableName, System.currentTimeMillis() - startTimeMillis);
return result.toString();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 53c3dad8223..851fb190821 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -79,8 +79,6 @@ public final class StandardPipelineTableMetaDataLoader implements PipelineTableM
long startMillis = System.currentTimeMillis();
String schemaNameFinal = isSchemaAvailable() ? schemaName : null;
Map<TableName, PipelineTableMetaData> tableMetaDataMap = loadTableMetaData0(connection, schemaNameFinal, tableNamePattern);
- log.info("loadTableMetaData, schemaNameFinal={}, tableNamePattern={}, result={}, cost time={} ms",
- schemaNameFinal, tableNamePattern, tableMetaDataMap, System.currentTimeMillis() - startMillis);
this.tableMetaDataMap.putAll(tableMetaDataMap);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index ff60d0765d0..a4e9fc2a66b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -172,13 +172,11 @@ public final class InventoryTaskSplitter {
preparedStatement.setLong(2, shardingSize);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
- log.info("getPositionByPrimaryKeyRange, resultSet's next return false, break");
break;
}
long endId = resultSet.getLong(1);
recordsCount += resultSet.getLong(2);
if (0 == endId) {
- log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), beginId);
break;
}
result.add(new IntegerPrimaryKeyPosition(beginId, endId));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index e1284c59298..d62c70f4fc2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -80,9 +80,7 @@ public final class PipelineJobPreparerUtils {
log.info("dataSourcePreparer null, ignore prepare target");
return;
}
- long startTimeMillis = System.currentTimeMillis();
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParam);
- log.info("prepareTargetSchema cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -134,10 +132,7 @@ public final class PipelineJobPreparerUtils {
}
String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- long startTimeMillis = System.currentTimeMillis();
- IngestPosition<?> result = PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
- log.info("getIncrementalPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
- return result;
+ return PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
}
/**
@@ -147,16 +142,13 @@ public final class PipelineJobPreparerUtils {
* @param dataSources data source
*/
public static void checkSourceDataSource(final String databaseType, final Collection<? extends DataSource> dataSources) {
- if (null == dataSources || dataSources.isEmpty()) {
- log.info("source data source is empty, skip check");
+ if (dataSources.isEmpty()) {
return;
}
- final long startTimeMillis = System.currentTimeMillis();
DataSourceChecker dataSourceChecker = DataSourceCheckerFactory.getInstance(databaseType);
dataSourceChecker.checkConnection(dataSources);
dataSourceChecker.checkPrivilege(dataSources);
dataSourceChecker.checkVariable(dataSources);
- log.info("checkSourceDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -172,10 +164,8 @@ public final class PipelineJobPreparerUtils {
log.info("target data source is empty, skip check");
return;
}
- long startTimeMillis = System.currentTimeMillis();
dataSourceChecker.checkConnection(targetDataSources);
dataSourceChecker.checkTargetTable(targetDataSources, importerConfig.getTableNameSchemaNameMapping(), importerConfig.getLogicTableNames());
- log.info("checkTargetDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
/**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 1fde15978c8..54c8f9013fc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -56,7 +56,6 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) {
DatabaseType targetDatabaseType = param.getTargetDatabaseType();
if (!targetDatabaseType.isSchemaAvailable()) {
- log.info("prepareTargetSchemas, target database does not support schema, ignore, targetDatabaseType={}", targetDatabaseType);
return;
}
CreateTableConfiguration createTableConfig = param.getCreateTableConfig();
@@ -74,7 +73,6 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
createdSchemaNames.add(targetSchemaName);
}
}
- log.info("prepareTargetSchemas, createdSchemaNames={}, defaultSchema={}", createdSchemaNames, defaultSchema);
}
private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9f963e4fd64..89518f60a2f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -120,7 +120,6 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
@Override
public void onSuccess() {
- log.info("incremental dumper onSuccess, taskId={}", taskId);
}
@Override
@@ -134,7 +133,6 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
@Override
public void onSuccess() {
- log.info("importer onSuccess, taskId={}", taskId);
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 32f653efa52..875f9faac39 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -59,7 +59,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
@Override
public void stop() {
jobItemContext.setStopping(true);
- log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(), jobItemContext.getShardingItem());
for (InventoryTask each : inventoryTasks) {
each.stop();
each.close();
@@ -73,13 +72,11 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
@Override
public void start() {
if (jobItemContext.isStopping()) {
- log.info("job stopping, ignore inventory task");
return;
}
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
if (executeInventoryTask()) {
if (jobItemContext.isStopping()) {
- log.info("stopping, ignore incremental task");
return;
}
executeIncrementalTask();
@@ -91,7 +88,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
log.info("All inventory tasks finished.");
return true;
}
- log.info("-------------- Start inventory task --------------");
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (InventoryTask each : inventoryTasks) {
@@ -136,7 +132,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
return;
}
- log.info("-------------- Start incremental task --------------");
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (IncrementalTask each : incrementalTasks) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 0d31db298eb..5795d74a8cc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -92,7 +92,6 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
@Override
public void onSuccess() {
- log.info("dumper onSuccess, taskId={}", taskId);
}
@Override
@@ -106,7 +105,6 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
@Override
public void onSuccess() {
- log.info("importer onSuccess, taskId={}", taskId);
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 7793b9914fb..bf0553f9e69 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -85,7 +85,6 @@ public final class PipelineDistributedBarrier {
*/
public void persistEphemeralChildrenNode(final String barrierPath, final int shardingItem) {
if (!getRepository().isExisted(barrierPath)) {
- log.info("barrier path {} not exist, ignore", barrierPath);
return;
}
String key = String.join("/", barrierPath, Integer.toString(shardingItem));
@@ -145,7 +144,6 @@ public final class PipelineDistributedBarrier {
return;
}
List<String> childrenKeys = getRepository().getChildrenKeys(barrierPath);
- log.info("children keys: {}, total count: {}", childrenKeys, holder.getTotalCount());
if (childrenKeys.size() == holder.getTotalCount()) {
holder.getCountDownLatch().countDown();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index fe235973243..462b1582119 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -44,7 +44,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
- log.info("{} is disabled", jobId);
Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
for (Integer each : shardingItems) {
@@ -58,7 +57,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
if (PipelineJobCenter.isJobExisting(jobId)) {
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
- log.info("{} executing jobs", jobId);
CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("execute failed, jobId={}", jobId, throwable);
@@ -67,7 +65,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
}
break;
case DELETED:
- log.info("deleted consistency check job id: {}", jobId);
PipelineJobCenter.stop(jobId);
break;
default:
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index c52622693d2..ccf9c8583f0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -75,7 +75,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
@Override
public void start() {
if (jobItemContext.isStopping()) {
- log.info("job stopping, ignore consistency check");
return;
}
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
@@ -86,7 +85,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
@Override
public void stop() {
jobItemContext.setStopping(true);
- log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(), jobItemContext.getShardingItem());
checkExecutor.stop();
}
@@ -94,7 +92,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
@Override
protected void runBlocking() {
- log.info("execute consistency check, check job id: {}, parent job id: {}", checkJobId, parentJobId);
checkJobAPI.persistJobItemProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) PipelineAPIFactory.getPipelineJobAPI(jobType);
@@ -110,7 +107,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
@Override
protected void doStop() throws SQLException {
DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
- log.info("doStop, algorithm={}", algorithm);
if (null != algorithm) {
algorithm.cancel();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 6cb937a2970..58098ad7ed8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -45,7 +45,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
- log.info("{} is disabled", jobId);
Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
for (Integer each : shardingItems) {
@@ -59,7 +58,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
if (PipelineJobCenter.isJobExisting(jobId)) {
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
- log.info("{} executing jobs", jobId);
CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("execute failed, jobId={}", jobId, throwable);
@@ -68,7 +66,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
}
break;
case DELETED:
- log.info("deleted jobId={}", jobId);
new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()));
PipelineJobCenter.stop(jobId);
break;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index a1224ac2b67..2894967e8fc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -86,7 +86,6 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
checkJobItemContext.setRecordsCount(recordsCount);
checkJobItemContext.getTableNames().add(jobConfig.getSourceTableName());
- log.info("consistency check, get records count: {}", recordsCount);
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index d1f88d38014..b7bd0fdbef4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -192,9 +192,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
Map<LogicTableName, Set<String>> shardingColumnsMap = ShardingColumnsExtractorFactory.getInstance().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), Collections.singleton(new LogicTableName(jobConfig.getTargetTableName())));
ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, tableNameSchemaNameMapping);
- MigrationTaskConfiguration result = new MigrationTaskConfiguration(jobConfig.getSourceResourceName(), createTableConfig, dumperConfig, importerConfig);
- log.info("buildTaskConfiguration, sourceResourceName={}, result={}", jobConfig.getSourceResourceName(), result);
- return result;
+ return new MigrationTaskConfiguration(jobConfig.getSourceResourceName(), createTableConfig, dumperConfig, importerConfig);
}
private CreateTableConfiguration buildCreateTableConfiguration(final MigrationJobConfiguration jobConfig) {
@@ -278,13 +276,12 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
@Override
public void rollback(final String jobId) throws SQLException {
- log.info("Rollback job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
dropCheckJobs(jobId);
stop(jobId);
cleanTempTableOnRollback(jobId);
dropJob(jobId);
- log.info("Rollback cost {} ms", System.currentTimeMillis() - startTimeMillis);
+ log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
}
private void dropCheckJobs(final String jobId) {
@@ -292,8 +289,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
if (checkJobIds.isEmpty()) {
return;
}
- log.info("dropCheckJobs start...");
- long startTimeMillis = System.currentTimeMillis();
for (String each : checkJobIds) {
try {
dropJob(each);
@@ -303,7 +298,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
}
}
- log.info("dropCheckJobs cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
private void cleanTempTableOnRollback(final String jobId) throws SQLException {
@@ -336,7 +330,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
@Override
public void addMigrationSourceResources(final Map<String, DataSourceProperties> dataSourcePropsMap) {
- log.info("Add migration source resources {}", dataSourcePropsMap.keySet());
Map<String, DataSourceProperties> existDataSources = dataSourcePersistService.load(getJobType());
Collection<String> duplicateDataSourceNames = new HashSet<>(dataSourcePropsMap.size(), 1);
for (Entry<String, DataSourceProperties> entry : dataSourcePropsMap.entrySet()) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 122a20d5b77..aa715f0c95a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -64,20 +64,17 @@ public final class MigrationJobPreparer {
public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException {
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
- log.info("prepare, job is stopping, jobId={}", jobItemContext.getJobId());
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
prepareAndCheckTargetWithLock(jobItemContext);
if (jobItemContext.isStopping()) {
- log.info("prepare, job is stopping, jobId={}", jobItemContext.getJobId());
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
if (PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType())) {
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
- log.info("prepare, job is stopping, jobId={}", jobItemContext.getJobId());
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
@@ -103,7 +100,6 @@ public final class MigrationJobPreparer {
boolean prepareFlag = JobStatus.PREPARING.equals(jobItemProgress.getStatus()) || JobStatus.RUNNING.equals(jobItemProgress.getStatus())
|| JobStatus.PREPARING_FAILURE.equals(jobItemProgress.getStatus());
if (prepareFlag) {
- log.info("execute prepare, jobId={}, shardingItem={}, jobStatus={}", jobConfig.getJobId(), jobItemContext.getShardingItem(), jobItemProgress.getStatus());
jobItemContext.setStatus(JobStatus.PREPARING);
JOB_API.updateJobItemStatus(jobConfig.getJobId(), jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
@@ -121,7 +117,6 @@ public final class MigrationJobPreparer {
private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
if (jobItemContext.isSourceTargetDatabaseTheSame()) {
- log.info("prepare target ...");
prepareTarget(jobItemContext);
}
InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
index 7445a132a37..b202edf176a 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/datasource/MySQLJdbcQueryPropertiesExtension.java
@@ -54,9 +54,7 @@ public final class MySQLJdbcQueryPropertiesExtension implements JdbcQueryPropert
private static String initMysqlConnectorVersion() {
try {
Class<?> mysqlDriverClass = MySQLJdbcQueryPropertiesExtension.class.getClassLoader().loadClass("com.mysql.jdbc.Driver");
- String result = mysqlDriverClass.getPackage().getImplementationVersion();
- log.info("mysql connector version {}", result);
- return result;
+ return mysqlDriverClass.getPackage().getImplementationVersion();
} catch (final ClassNotFoundException ex) {
log.warn("not find com.mysql.jdbc.Driver class");
return null;
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 2e5f432d33d..5a4d53ffdeb 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -91,39 +91,35 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
protected void runBlocking() {
client.connect();
client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
- int eventCount = 0;
while (isRunning()) {
AbstractBinlogEvent event = client.poll();
if (null == event) {
continue;
}
- eventCount += handleEvent(event);
+ handleEvent(event);
}
- log.info("incremental dump, eventCount={}", eventCount);
channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
}
- private int handleEvent(final AbstractBinlogEvent event) {
+ private void handleEvent(final AbstractBinlogEvent event) {
if (event instanceof PlaceholderEvent || !((AbstractRowsEvent) event).getDatabaseName().equals(catalog) || !dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
createPlaceholderRecord(event);
- return 0;
+ return;
}
if (event instanceof WriteRowsEvent) {
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
- return 1;
+ return;
}
if (event instanceof UpdateRowsEvent) {
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
- return 1;
+ return;
}
if (event instanceof DeleteRowsEvent) {
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
- return 1;
}
- return 0;
}
private void createPlaceholderRecord(final AbstractBinlogEvent event) {
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 3df3d068be7..6ab8369812b 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -283,7 +283,6 @@ public final class MySQLClient {
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
- log.warn("channel inactive");
if (!running) {
return;
}
@@ -305,12 +304,11 @@ public final class MySQLClient {
running = false;
return;
}
- int retryTimes = reconnectTimes.incrementAndGet();
+ reconnectTimes.incrementAndGet();
if (null == lastBinlogEvent || null == lastBinlogEvent.getFileName()) {
log.warn("last binlog event is null or the file name is null, last binlog event:{}", lastBinlogEvent);
return;
}
- log.info("reconnect MySQL client, retry times={}", retryTimes);
closeChannel();
connect();
subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
index abfad6e74bf..d31d48c8a90 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
@@ -37,7 +37,7 @@ import java.util.Collections;
* PostgreSQL Data source checker.
*/
@Slf4j
-public class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker {
+public final class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker {
private static final String SHOW_GRANTS_SQL = "SELECT * FROM pg_roles WHERE rolname = ?";