You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/11/01 07:26:08 UTC
[shardingsphere] branch master updated: Merge PipelineJobItemAPI into PipelineJobAPI (#21880)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e7050f3d4cc Merge PipelineJobItemAPI into PipelineJobAPI (#21880)
e7050f3d4cc is described below
commit e7050f3d4cc95f6b2e492b93a18d98c130ad1992
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Nov 1 15:26:01 2022 +0800
Merge PipelineJobItemAPI into PipelineJobAPI (#21880)
---
.../data/pipeline/core/api/PipelineJobAPI.java | 30 ++++++-
.../data/pipeline/core/api/PipelineJobItemAPI.java | 53 ------------
.../AbstractInventoryIncrementalJobAPIImpl.java | 60 +++++++++++---
.../impl/InventoryIncrementalJobItemAPIImpl.java | 93 ----------------------
.../core/task/InventoryIncrementalTasksRunner.java | 16 ++--
.../consistencycheck/ConsistencyCheckJobAPI.java | 3 +-
6 files changed, 91 insertions(+), 164 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index fa31460c16f..1e8913732ad 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -22,8 +22,11 @@ import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfigurat
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -33,7 +36,7 @@ import java.util.Optional;
* Pipeline job API.
*/
@SingletonSPI
-public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI, TypedSPI {
+public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
/**
* Marshal pipeline job id.
@@ -84,6 +87,31 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI
*/
PipelineJobConfiguration getJobConfiguration(String jobId);
+ /**
+ * Persist job item progress.
+ *
+ * @param jobItemContext job item context
+ */
+ void persistJobItemProgress(PipelineJobItemContext jobItemContext);
+
+ /**
+ * Get job item progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return job item progress, may be null
+ */
+ PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+
+ /**
+ * Update job item status.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param status status
+ */
+ void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
+
/**
* Get job item error message.
*
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
deleted file mode 100644
index 1a492b79c88..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.api;
-
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
-
-/**
- * Pipeline job item API.
- */
-public interface PipelineJobItemAPI {
-
- /**
- * Persist job item progress.
- *
- * @param jobItemContext job item context
- */
- void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-
- /**
- * Get job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return job item progress, may be null
- */
- PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
-
- /**
- * Update job item status.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param status status
- */
- void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 55e3ffafea9..c32105d2f14 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -26,12 +27,21 @@ import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProces
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
@@ -40,8 +50,10 @@ import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessC
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -55,11 +67,11 @@ import java.util.stream.IntStream;
@Slf4j
public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI, InventoryIncrementalJobPublicAPI {
- private final YamlPipelineProcessConfigurationSwapper swapper = new YamlPipelineProcessConfigurationSwapper();
+ private final YamlPipelineProcessConfigurationSwapper processConfigSwapper = new YamlPipelineProcessConfigurationSwapper();
private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
- private final InventoryIncrementalJobItemAPIImpl jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
+ private final YamlInventoryIncrementalJobItemProgressSwapper jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
protected abstract String getTargetDatabaseType(PipelineJobConfiguration pipelineJobConfig);
@@ -80,7 +92,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
- return swapper.swapToYamlConfiguration(existingProcessConfig);
+ return processConfigSwapper.swapToYamlConfiguration(existingProcessConfig);
}
@Override
@@ -89,7 +101,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
PipelineProcessConfigurationUtil.verifyConfPath(confPath);
YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
- processConfigPersistService.persist(getJobType(), swapper.swapToObject(targetYamlProcessConfig));
+ processConfigPersistService.persist(getJobType(), processConfigSwapper.swapToObject(targetYamlProcessConfig));
}
@Override
@@ -120,18 +132,48 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}
@Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
- return jobItemAPI.getJobItemProgress(jobId, shardingItem);
+ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+ InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
+ InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
+ jobItemProgress.setStatus(jobItemContext.getStatus());
+ jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+ jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
+ jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
+ jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
+ jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
+ jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
+ String value = YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
+ }
+
+ private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
+ IncrementalTask incrementalTask = incrementalTasks.size() > 0 ? incrementalTasks.iterator().next() : null;
+ return new JobItemIncrementalTasksProgress(null != incrementalTask ? incrementalTask.getTaskProgress() : null);
+ }
+
+ private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<InventoryTask> inventoryTasks) {
+ Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
+ for (InventoryTask each : inventoryTasks) {
+ inventoryTaskProgressMap.put(each.getTaskId(), each.getTaskProgress());
+ }
+ return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
}
@Override
- public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
- jobItemAPI.persistJobItemProgress(jobItemContext);
+ public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+ String data = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+ return Strings.isNullOrEmpty(data) ? null : jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(data, YamlInventoryIncrementalJobItemProgress.class));
}
@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
- jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
+ InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
+ if (null == jobItemProgress) {
+ log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
+ return;
+ }
+ jobItemProgress.setStatus(status);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress)));
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
deleted file mode 100644
index 01eda47a28c..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.api.impl;
-
-import com.google.common.base.Strings;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Inventory incremental job item API implementation.
- */
-@Slf4j
-public final class InventoryIncrementalJobItemAPIImpl implements PipelineJobItemAPI {
-
- private static final YamlInventoryIncrementalJobItemProgressSwapper SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper();
-
- @Override
- public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
- InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
- InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
- jobItemProgress.setStatus(jobItemContext.getStatus());
- jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
- jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
- jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
- jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
- jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
- jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
- String value = YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
- }
-
- private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
- IncrementalTask incrementalTask = incrementalTasks.size() > 0 ? incrementalTasks.iterator().next() : null;
- return new JobItemIncrementalTasksProgress(null != incrementalTask ? incrementalTask.getTaskProgress() : null);
- }
-
- private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<InventoryTask> inventoryTasks) {
- Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
- for (InventoryTask each : inventoryTasks) {
- inventoryTaskProgressMap.put(each.getTaskId(), each.getTaskProgress());
- }
- return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
- }
-
- @Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
- String data = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
- return Strings.isNullOrEmpty(data) ? null : SWAPPER.swapToObject(YamlEngine.unmarshal(data, YamlInventoryIncrementalJobItemProgress.class));
- }
-
- @Override
- public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
- InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
- if (null == jobItemProgress) {
- log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
- return;
- }
- jobItemProgress.setStatus(status);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
- }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 40047a92600..4c7b7459edf 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -26,8 +26,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -42,8 +40,6 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public final class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
- private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
-
@Getter
private final PipelineJobItemContext jobItemContext;
@@ -51,6 +47,15 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
private final Collection<IncrementalTask> incrementalTasks;
+ private final PipelineJobAPI jobAPI;
+
+ public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+ this.jobItemContext = jobItemContext;
+ this.inventoryTasks = inventoryTasks;
+ this.incrementalTasks = incrementalTasks;
+ jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()));
+ }
+
@Override
public void stop() {
jobItemContext.setStopping(true);
@@ -100,7 +105,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
log.error("onFailure, inventory task execute failed.", throwable);
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
String jobId = jobItemContext.getJobId();
- PipelineJobAPI jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId));
jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
jobAPI.stop(jobId);
}
@@ -120,7 +124,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
- jobItemAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
+ jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
}
private synchronized void executeIncrementalTask() {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
index d8495582568..3d328d8fffb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
@@ -19,10 +19,9 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
/**
* Consistency check job API.
*/
-public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI, PipelineJobItemAPI {
+public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI {
}