You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/05 12:39:38 UTC
[shardingsphere] branch master updated: Revise 21929 (#21975)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b150a056460 Revise 21929 (#21975)
b150a056460 is described below
commit b150a056460aee2bae2af16f011c948618ca303d
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Nov 5 20:39:31 2022 +0800
Revise 21929 (#21975)
* Remove unused start_time property
* Use RequiredArgsConstructor to replace constructor; Change fields ordering
* Rename JobItemProgressInfo to JobItemInfo
* Improve code style of JobStatusQueryResultSet classes
* Rename ConsistencyCheckJobProgress to ConsistencyCheckJobItemProgress
* Clean comment
---
.../ShowMigrationCheckStatusQueryResultSet.java | 21 ++++------
.../ShowMigrationJobStatusQueryResultSet.java | 22 +++++-----
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 8 ++--
.../api/InventoryIncrementalJobPublicAPI.java | 8 ++--
.../StandardPipelineDataSourceConfiguration.java | 1 -
...s.java => ConsistencyCheckJobItemProgress.java} | 5 ++-
...sInfo.java => ConsistencyCheckJobItemInfo.java} | 6 ++-
...o.java => InventoryIncrementalJobItemInfo.java} | 18 +++-----
.../AbstractInventoryIncrementalJobAPIImpl.java | 16 ++++----
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 4 +-
...va => YamlConsistencyCheckJobItemProgress.java} | 4 +-
...amlConsistencyCheckJobItemProgressSwapper.java} | 14 +++----
.../ConsistencyCheckJobAPIImpl.java | 48 +++++++++++-----------
13 files changed, 83 insertions(+), 92 deletions(-)
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index fa1afc153ad..7068bdabe12 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query;
import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
@@ -27,9 +27,8 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Optional;
/**
@@ -44,15 +43,13 @@ public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDis
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement;
- ConsistencyCheckJobProgressInfo progressInfo = JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
- List<Collection<Object>> result = new LinkedList<>();
- String checkResult = null == progressInfo.getCheckSuccess() ? "" : progressInfo.getCheckSuccess().toString();
- result.add(Arrays.asList(Optional.ofNullable(progressInfo.getTableNames()).orElse(""), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
- progressInfo.getRemainingSeconds(),
- Optional.ofNullable(progressInfo.getCheckBeginTime()).orElse(""),
- Optional.ofNullable(progressInfo.getCheckEndTime()).orElse(""),
- progressInfo.getDurationSeconds(), Optional.ofNullable(progressInfo.getErrorMessage()).orElse("")));
- data = result.iterator();
+ ConsistencyCheckJobItemInfo info = JOB_API.getJobItemInfo(checkMigrationStatement.getJobId());
+ String checkResult = null == info.getCheckSuccess() ? "" : info.getCheckSuccess().toString();
+ Collection<Object> result = Arrays.asList(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult,
+ String.valueOf(info.getFinishedPercentage()), info.getRemainingSeconds(),
+ Optional.ofNullable(info.getCheckBeginTime()).orElse(""), Optional.ofNullable(info.getCheckEndTime()).orElse(""),
+ info.getDurationSeconds(), Optional.ofNullable(info.getErrorMessage()).orElse(""));
+ data = Collections.singletonList(result).iterator();
}
@Override
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 9f288faef45..fd22fe35d92 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -46,19 +46,12 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
long currentTimeMillis = System.currentTimeMillis();
- List<InventoryIncrementalJobItemProgressInfo> jobProgress = JOB_API.getJobProgressInfos(((ShowMigrationStatusStatement) sqlStatement).getJobId());
- data = jobProgress.stream().map(each -> {
+ List<InventoryIncrementalJobItemInfo> jobItemInfos = JOB_API.getJobItemInfos(((ShowMigrationStatusStatement) sqlStatement).getJobId());
+ data = jobItemInfos.stream().map(each -> {
Collection<Object> result = new LinkedList<>();
result.add(each.getShardingItem());
InventoryIncrementalJobItemProgress jobItemProgress = each.getJobItemProgress();
- if (null == jobItemProgress) {
- result.add("");
- result.add("");
- result.add("");
- result.add("");
- result.add("");
- result.add("");
- } else {
+ if (null != jobItemProgress) {
result.add(jobItemProgress.getDataSourceName());
result.add(jobItemProgress.getStatus());
result.add(jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
@@ -70,6 +63,13 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
}
result.add(incrementalIdleSeconds);
+ } else {
+ result.add("");
+ result.add("");
+ result.add("");
+ result.add("");
+ result.add("");
+ result.add("");
}
result.add(each.getErrorMessage());
return result;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 2e59d9e3976..039b2a0342c 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -62,10 +62,10 @@ public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, Requ
void stopByParentJobId(String parentJobId);
/**
- * Get consistency job progress info.
+ * Get consistency job item info.
*
* @param parentJobId parent job id
- * @return consistency job progress info
+ * @return consistency job item info
*/
- ConsistencyCheckJobProgressInfo getJobProgressInfo(String parentJobId);
+ ConsistencyCheckJobItemInfo getJobItemInfo(String parentJobId);
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index d39f7ad683a..da75b1a92f3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
-import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -77,12 +77,12 @@ public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI,
void commit(String jobId);
/**
- * Get job progress info list.
+ * Get job infos.
*
* @param jobId job id
- * @return all sharding item progress infos
+ * @return job item infos
*/
- List<InventoryIncrementalJobItemProgressInfo> getJobProgressInfos(String jobId);
+ List<InventoryIncrementalJobItemInfo> getJobItemInfos(String jobId);
/**
* List all data consistency check algorithms from SPI.
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index cb708db902e..adb3c3c6850 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -71,7 +71,6 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
for (String each : Arrays.asList("minPoolSize", "minimumIdle")) {
yamlConfig.put(each, "1");
}
- // TODO jdbcUrl not find now, can be deleted after confirmation
if (yamlConfig.containsKey("jdbcUrl")) {
yamlConfig.put("url", yamlConfig.get("jdbcUrl"));
yamlConfig.remove("jdbcUrl");
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
similarity index 88%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
index e3297179140..dcd0195bfa3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
@@ -23,12 +23,13 @@ import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
/**
- * Data consistency check job progress.
+ * Data consistency check job item progress.
*/
+// TODO use final for fields
@Getter
@Setter
@ToString
-public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
+public final class ConsistencyCheckJobItemProgress implements PipelineJobItemProgress {
private JobStatus status = JobStatus.RUNNING;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
similarity index 87%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
index e8027a8d9a6..33252fab5e8 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
@@ -21,11 +21,13 @@ import lombok.Getter;
import lombok.Setter;
/**
- * Consistency check job progress info.
+ * Consistency check job item info.
*/
+// TODO use final for fields
+// TODO embed ConsistencyCheckJobItemProgress to reduce fields
@Getter
@Setter
-public final class ConsistencyCheckJobProgressInfo {
+public final class ConsistencyCheckJobItemInfo {
private String tableNames;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemProgressInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
similarity index 69%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemProgressInfo.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
index 101fb573098..2521ef3652e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemProgressInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
@@ -18,27 +18,21 @@
package org.apache.shardingsphere.data.pipeline.api.pojo;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
/**
- * Inventory incremental job item progress info.
+ * Inventory incremental job item info.
*/
+@RequiredArgsConstructor
@Getter
-public final class InventoryIncrementalJobItemProgressInfo {
+public final class InventoryIncrementalJobItemInfo {
private final int shardingItem;
- private final String errorMessage;
+ private final InventoryIncrementalJobItemProgress jobItemProgress;
private final long startTimeMillis;
- private final InventoryIncrementalJobItemProgress jobItemProgress;
-
- public InventoryIncrementalJobItemProgressInfo(final int shardingItem, final String errorMessage, final long startTimeMills,
- final InventoryIncrementalJobItemProgress jobItemProgress) {
- this.shardingItem = shardingItem;
- this.errorMessage = errorMessage;
- this.startTimeMillis = startTimeMills;
- this.jobItemProgress = jobItemProgress;
- }
+ private final String errorMessage;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index a9895b1fa97..10290c57c7c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
-import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -129,16 +129,16 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}
@Override
- public List<InventoryIncrementalJobItemProgressInfo> getJobProgressInfos(final String jobId) {
+ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
- List<InventoryIncrementalJobItemProgressInfo> result = new ArrayList<>(jobProgress.size());
+ List<InventoryIncrementalJobItemInfo> result = new ArrayList<>(jobProgress.size());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
- InventoryIncrementalJobItemProgressInfo progressInfo = new InventoryIncrementalJobItemProgressInfo(shardingItem, errorMessage, startTimeMillis, entry.getValue());
+ InventoryIncrementalJobItemInfo progressInfo = new InventoryIncrementalJobItemInfo(shardingItem, entry.getValue(), startTimeMillis, errorMessage);
if (null == entry.getValue()) {
continue;
}
@@ -151,15 +151,15 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
- jobItemProgress.setStatus(jobItemContext.getStatus());
- jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
- jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
+ jobItemProgress.setStatus(context.getStatus());
+ jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
+ jobItemProgress.setDataSourceName(context.getDataSourceName());
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
String value = YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(), context.getShardingItem(), value);
}
private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 4c1b8aae6fc..0daae87d451 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -109,7 +109,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
jobConfigPOJO.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig)));
String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
jobConfigPOJO.getProps().setProperty("create_time", createTimeFormat);
- jobConfigPOJO.getProps().setProperty("start_time", createTimeFormat);
jobConfigPOJO.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
return YamlEngine.marshal(jobConfigPOJO);
}
@@ -125,10 +124,9 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
jobConfigPOJO.setDisabled(false);
+ jobConfigPOJO.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
jobConfigPOJO.getProps().remove("stop_time");
jobConfigPOJO.getProps().remove("stop_time_millis");
- jobConfigPOJO.getProps().setProperty("start_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
- jobConfigPOJO.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
similarity index 90%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index b5d1940ddfb..4b610032745 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -22,11 +22,11 @@ import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Yaml data consistency check job progress.
+ * Yaml data consistency check job item progress.
*/
@Getter
@Setter
-public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
+public final class YamlConsistencyCheckJobItemProgress implements YamlConfiguration {
private String status;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
similarity index 74%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index a3f5dcf20b1..9f4dabdd9ae 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -18,17 +18,17 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
- * YAML data check job progress swapper.
+ * YAML data check job item progress swapper.
*/
-public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobProgress, ConsistencyCheckJobProgress> {
+public final class YamlConsistencyCheckJobItemProgressSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobItemProgress, ConsistencyCheckJobItemProgress> {
@Override
- public YamlConsistencyCheckJobProgress swapToYamlConfiguration(final ConsistencyCheckJobProgress data) {
- YamlConsistencyCheckJobProgress result = new YamlConsistencyCheckJobProgress();
+ public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final ConsistencyCheckJobItemProgress data) {
+ YamlConsistencyCheckJobItemProgress result = new YamlConsistencyCheckJobItemProgress();
result.setStatus(data.getStatus().name());
result.setRecordsCount(data.getRecordsCount());
result.setCheckedRecordsCount(data.getCheckedRecordsCount());
@@ -39,8 +39,8 @@ public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigu
}
@Override
- public ConsistencyCheckJobProgress swapToObject(final YamlConsistencyCheckJobProgress yamlConfig) {
- ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
+ public ConsistencyCheckJobItemProgress swapToObject(final YamlConsistencyCheckJobItemProgress yamlConfig) {
+ ConsistencyCheckJobItemProgress result = new ConsistencyCheckJobItemProgress();
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
result.setRecordsCount(yamlConfig.getRecordsCount());
result.setCheckedRecordsCount(yamlConfig.getCheckedRecordsCount());
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 4f4f66f74e3..2ef7f1db3b0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -30,9 +30,9 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContex
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -42,8 +42,8 @@ import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJob
import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -67,7 +67,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
- private final YamlConsistencyCheckJobProgressSwapper swapper = new YamlConsistencyCheckJobProgressSwapper();
+ private final YamlConsistencyCheckJobItemProgressSwapper swapper = new YamlConsistencyCheckJobItemProgressSwapper();
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@@ -112,33 +112,33 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
- ConsistencyCheckJobItemContext checkJobItemContext = (ConsistencyCheckJobItemContext) jobItemContext;
- ConsistencyCheckJobProgress jobProgress = new ConsistencyCheckJobProgress();
- jobProgress.setStatus(jobItemContext.getStatus());
- jobProgress.setCheckedRecordsCount(checkJobItemContext.getCheckedRecordsCount().get());
- jobProgress.setRecordsCount(checkJobItemContext.getRecordsCount());
- jobProgress.setCheckBeginTimeMillis(checkJobItemContext.getCheckBeginTimeMillis());
- jobProgress.setCheckEndTimeMillis(checkJobItemContext.getCheckEndTimeMillis());
- jobProgress.setTableNames(String.join(",", checkJobItemContext.getTableNames()));
- YamlConsistencyCheckJobProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobProgress);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
+ ConsistencyCheckJobItemContext context = (ConsistencyCheckJobItemContext) jobItemContext;
+ ConsistencyCheckJobItemProgress jobItemProgress = new ConsistencyCheckJobItemProgress();
+ jobItemProgress.setStatus(context.getStatus());
+ jobItemProgress.setCheckedRecordsCount(context.getCheckedRecordsCount().get());
+ jobItemProgress.setRecordsCount(context.getRecordsCount());
+ jobItemProgress.setCheckBeginTimeMillis(context.getCheckBeginTimeMillis());
+ jobItemProgress.setCheckEndTimeMillis(context.getCheckEndTimeMillis());
+ jobItemProgress.setTableNames(String.join(",", context.getTableNames()));
+ YamlConsistencyCheckJobItemProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobItemProgress);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(), context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
}
@Override
- public ConsistencyCheckJobProgress getJobItemProgress(final String jobId, final int shardingItem) {
+ public ConsistencyCheckJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
- return Strings.isNullOrEmpty(progress) ? null : swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
+ return Strings.isNullOrEmpty(progress) ? null : swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobItemProgress.class, true));
}
@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
- ConsistencyCheckJobProgress jobProgress = getJobItemProgress(jobId, shardingItem);
- if (null == jobProgress) {
+ ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
+ if (null == jobItemProgress) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
- jobProgress.setStatus(status);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobProgress)));
+ jobItemProgress.setStatus(status);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
}
@Override
@@ -169,12 +169,12 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
}
@Override
- public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String parentJobId) {
+ public ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = checkLatestJobId.get();
- ConsistencyCheckJobProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
- ConsistencyCheckJobProgressInfo result = new ConsistencyCheckJobProgressInfo();
+ ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+ ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
if (null == jobItemProgress) {
return result;
}