You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2022/12/28 02:48:54 UTC
[shardingsphere] branch master updated: Refactor pipeline getJobItemProgress return Optional (#23108)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5f3946ece36 Refactor pipeline getJobItemProgress return Optional (#23108)
5f3946ece36 is described below
commit 5f3946ece365bb5b034be19f1dc87677e1b0795e
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Dec 28 10:48:47 2022 +0800
Refactor pipeline getJobItemProgress return Optional (#23108)
* Refactor GovernanceRepositoryAPI.getJobItemProgress return Optional
* Refactor PipelineJobAPI.getJobItemProgress return Optional
* Compatible with empty offset value
---
.../data/pipeline/cdc/api/impl/CDCJobAPIImpl.java | 3 ++-
.../data/pipeline/cdc/core/job/CDCJob.java | 6 +++--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 2 +-
.../pipeline/core/api/GovernanceRepositoryAPI.java | 2 +-
.../core/api/InventoryIncrementalJobAPI.java | 3 ++-
.../data/pipeline/core/api/PipelineJobAPI.java | 2 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 23 ++++++++---------
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 5 ++--
.../consistencycheck/ConsistencyCheckJob.java | 8 ++++--
.../api/impl/ConsistencyCheckJobAPI.java | 29 +++++++++++-----------
.../pipeline/scenario/migration/MigrationJob.java | 5 ++--
.../migration/prepare/MigrationJobPreparer.java | 9 ++++---
.../handler/cdc/fixture/FixtureCDCJobAPI.java | 2 +-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 13 +++++-----
.../core/api/impl/MigrationJobAPITest.java | 7 +++---
15 files changed, 63 insertions(+), 56 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
index 7b93604f380..7d7c6b271be 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
@@ -84,6 +84,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -232,7 +233,7 @@ public final class CDCJobAPIImpl extends AbstractInventoryIncrementalJobAPIImpl
}
@Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+ public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
// TODO to be implemented
return null;
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 8dbe1a30974..1b7e68c95aa 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -38,6 +38,8 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTas
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import java.util.Optional;
+
/**
* CDC job.
*/
@@ -62,10 +64,10 @@ public final class CDCJob extends AbstractSimplePipelineJob {
protected PipelineJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
- return new CDCJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager, importerConnector);
+ return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, importerConnector);
}
protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index f90bac04580..8a87b8f1497 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -54,7 +54,7 @@ public final class CDCJobPreparer {
* @param jobItemContext job item context
*/
public void prepare(final CDCJobItemContext jobItemContext) {
- if (null == jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem())) {
+ if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
}
if (jobItemContext.isStopping()) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 75fb805a811..b65b5632713 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -55,7 +55,7 @@ public interface GovernanceRepositoryAPI {
* @param shardingItem sharding item
* @return job item progress
*/
- String getJobItemProgress(String jobId, int shardingItem);
+ Optional<String> getJobItemProgress(String jobId, int shardingItem);
/**
* Get latest check job id.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index f459fbcd91e..8e47caca94c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -30,6 +30,7 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -60,7 +61,7 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(PipelineJobConfiguration pipelineJobConfig);
@Override
- InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(String jobId, int shardingItem);
/**
* Get job infos.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 0508ad0aac8..2628da172ae 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -131,7 +131,7 @@ public interface PipelineJobAPI extends TypedSPI {
* @param shardingItem sharding item
* @return job item progress, may be null
*/
- PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+ Optional<? extends PipelineJobItemProgress> getJobItemProgress(String jobId, int shardingItem);
/**
* Update job item status.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 0398d914d5a..58b91b400d6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
@@ -96,11 +95,9 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
- InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
- if (null != jobItemProgress) {
- jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
- }
- map.put(each, jobItemProgress);
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress = getJobItemProgress(jobId, each);
+ jobItemProgress.ifPresent(progress -> progress.setActive(!jobConfigPOJO.isDisabled()));
+ map.put(each, jobItemProgress.orElse(null));
}, LinkedHashMap::putAll);
}
@@ -157,19 +154,19 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}
@Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
- String data = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
- return Strings.isNullOrEmpty(data) ? null : jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(data, YamlInventoryIncrementalJobItemProgress.class));
+ public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
+ Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+ return progress.map(s -> jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(s, YamlInventoryIncrementalJobItemProgress.class)));
}
@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
- InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
- if (null == jobItemProgress) {
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress = getJobItemProgress(jobId, shardingItem);
+ if (!jobItemProgress.isPresent()) {
return;
}
- jobItemProgress.setStatus(status);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress)));
+ jobItemProgress.get().setStatus(status);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 37c27a7942e..d28a50a2519 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -61,8 +61,9 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
}
@Override
- public String getJobItemProgress(final String jobId, final int shardingItem) {
- return repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, shardingItem));
+ public Optional<String> getJobItemProgress(final String jobId, final int shardingItem) {
+ String text = repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, shardingItem));
+ return Strings.isNullOrEmpty(text) ? Optional.empty() : Optional.of(text);
}
@Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 406a67b37f8..88a11c31209 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -24,11 +24,14 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import java.util.Optional;
+
/**
* Consistency check job.
*/
@@ -38,8 +41,9 @@ public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
@Override
public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- ConsistencyCheckJobItemProgress jobItemProgress = (ConsistencyCheckJobItemProgress) getJobAPI().getJobItemProgress(jobConfig.getJobId(), shardingContext.getShardingItem());
- return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress);
+ ConsistencyCheckJobAPI jobAPI = (ConsistencyCheckJobAPI) getJobAPI();
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingContext.getShardingItem());
+ return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress.orElse(null));
}
@Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index b4d781ea6ba..463902206e8 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
@@ -30,7 +29,6 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContex
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
@@ -96,8 +94,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
String parentJobId = param.getJobId();
Optional<String> latestCheckJobId = repositoryAPI.getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
- PipelineJobItemProgress progress = getJobItemProgress(latestCheckJobId.get(), 0);
- if (null == progress || JobStatus.FINISHED != progress.getStatus()) {
+ Optional<ConsistencyCheckJobItemProgress> progress = getJobItemProgress(latestCheckJobId.get(), 0);
+ if (!progress.isPresent() || JobStatus.FINISHED != progress.get().getStatus()) {
log.info("check job already exists and status is not FINISHED, progress={}", progress);
throw new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
}
@@ -141,26 +139,26 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
}
@Override
- public ConsistencyCheckJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
- String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
- return Strings.isNullOrEmpty(progress) ? null : swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobItemProgress.class, true));
+ public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
+ Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+ return progress.map(s -> swapper.swapToObject(YamlEngine.unmarshal(s, YamlConsistencyCheckJobItemProgress.class, true)));
}
@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
- ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
- if (null == jobItemProgress) {
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress = getJobItemProgress(jobId, shardingItem);
+ if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
- jobItemProgress.setStatus(status);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
+ jobItemProgress.get().setStatus(status);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
@Override
public void startDisabledJob(final String jobId) {
- PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
- if (null != jobProgress && JobStatus.FINISHED == jobProgress.getStatus()) {
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress = getJobItemProgress(jobId, 0);
+ if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
}
@@ -223,11 +221,12 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
Optional<String> latestCheckJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
- ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+ Optional<ConsistencyCheckJobItemProgress> progressOptional = getJobItemProgress(checkJobId, 0);
ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
- if (null == jobItemProgress) {
+ if (!progressOptional.isPresent()) {
return result;
}
+ ConsistencyCheckJobItemProgress jobItemProgress = progressOptional.get();
LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
if (null == jobItemProgress.getRecordsCount() || null == jobItemProgress.getCheckedRecordsCount()) {
result.setFinishedPercentage(0);
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 59d14624d0f..3ac7d7559f5 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,6 +37,7 @@ import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigur
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import java.sql.SQLException;
+import java.util.Optional;
/**
* Migration job.
@@ -56,10 +57,10 @@ public final class MigrationJob extends AbstractSimplePipelineJob {
protected InventoryIncrementalJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
- return new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
+ return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
@Override
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 1edd4529270..ae007aa6a0c 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -49,6 +49,7 @@ import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import java.sql.SQLException;
import java.util.Collections;
+import java.util.Optional;
/**
* Migration job preparer.
@@ -91,7 +92,7 @@ public final class MigrationJobPreparer {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String lockName = "prepare-" + jobConfig.getJobId();
LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
- if (null == jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem())) {
+ if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
}
LockDefinition lockDefinition = new GlobalLockDefinition(lockName);
@@ -99,9 +100,9 @@ public final class MigrationJobPreparer {
if (lockContext.tryLock(lockDefinition, 180000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
try {
- InventoryIncrementalJobItemProgress jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
- JobStatus currentStatus = null != jobItemProgress ? jobItemProgress.getStatus() : null;
- boolean prepareFlag = null == jobItemProgress || JobStatus.PREPARING.equals(currentStatus) || JobStatus.RUNNING.equals(currentStatus)
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+ JobStatus currentStatus = jobItemProgress.map(InventoryIncrementalJobItemProgress::getStatus).orElse(null);
+ boolean prepareFlag = !jobItemProgress.isPresent() || JobStatus.PREPARING.equals(currentStatus) || JobStatus.RUNNING.equals(currentStatus)
|| JobStatus.PREPARING_FAILURE.equals(currentStatus);
if (prepareFlag) {
jobItemContext.setStatus(JobStatus.PREPARING);
diff --git a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
index 53e2c2c29b0..8617049ab4b 100644
--- a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
+++ b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
@@ -108,7 +108,7 @@ public final class FixtureCDCJobAPI implements InventoryIncrementalJobAPI, CDCJo
}
@Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+ public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
return null;
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 3caab14c0da..a836881ca50 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -29,12 +29,12 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -48,6 +48,7 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -57,7 +58,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -75,8 +75,9 @@ public final class GovernanceRepositoryAPIImplTest {
public void assertPersistJobProgress() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue");
- String actual = governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
- assertThat(actual, is("testValue"));
+ Optional<String> actual = governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), is("testValue"));
}
@Test
@@ -94,8 +95,8 @@ public final class GovernanceRepositoryAPIImplTest {
public void assertDeleteJob() {
governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.deleteJob("1");
- String actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
- assertNull(actual);
+ Optional<String> actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
+ assertFalse(actual.isPresent());
}
@Test
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
index 80d6ddfa275..331cd7c3487 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
@@ -65,7 +65,6 @@ import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -235,9 +234,9 @@ public final class MigrationJobAPITest {
MigrationJobItemContext jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
jobAPI.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
- InventoryIncrementalJobItemProgress actual = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
- assertNotNull(actual);
- assertThat(actual.getStatus(), is(JobStatus.FINISHED));
+ Optional<InventoryIncrementalJobItemProgress> actual = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+ assertTrue(actual.isPresent());
+ assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
}
@Test