You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/09/26 09:44:44 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21194: Refactor check migration, change to use async job

sandynz commented on code in PR #21194:
URL: https://github.com/apache/shardingsphere/pull/21194#discussion_r979746251


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobResultConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Consistency check job configuration for YAML.
+ */
+@Getter
+@Setter
+@Slf4j
+@ToString
+public final class YamlConsistencyCheckJobResultConfiguration implements YamlPipelineJobConfiguration {
+    
+    private String jobId;
+    
+    private String referredJobId;
+    
+    private String algorithmTypeName;
+    
+    @Override
+    public String getTargetDatabaseName() {
+        return null;
+    }

Review Comment:
   It's better to throw UnsupportedOperationException



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobResultConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Consistency check job configuration for YAML.
+ */
+@Getter
+@Setter
+@Slf4j
+@ToString
+public final class YamlConsistencyCheckJobResultConfiguration implements YamlPipelineJobConfiguration {
+    
+    private String jobId;
+    
+    private String referredJobId;

Review Comment:
   Could we rename it to `parentJobId`?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception.job;
+
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
+/**
+ * Pipeline job has already existed exception.
+ */
+public final class PipelineJobHasAlreadyExistedException extends PipelineSQLException {
+    
+    private static final long serialVersionUID = 2854259384634892428L;
+    
+    public PipelineJobHasAlreadyExistedException(final String jobId) {
+        super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already existed", jobId);
+    }

Review Comment:
   Error code `81` is already used by `PipelineJobHasAlreadyStartedException`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML consistency check job configuration swapper.
+ */
+public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobResultConfiguration, ConsistencyCheckJobConfiguration> {

Review Comment:
   > <YamlConsistencyCheckJobResultConfiguration, ConsistencyCheckJobConfiguration>
   
   Could we rename `YamlConsistencyCheckJobResultConfiguration` to `YamlConsistencyCheckJobConfiguration`, it looks strange.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultConfiguration.YamlDataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job.
+ */
+@Slf4j
+public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
+    
+    private static final YamlDataConsistencyCheckResultSwapper CHECK_RESULT_SWAPPER = new YamlDataConsistencyCheckResultSwapper();
+    
+    private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+    
+    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
+    
+    @Override
+    public void execute(final ShardingContext shardingContext) {
+        String checkJobId = shardingContext.getJobName();
+        setJobId(checkJobId);
+        ConsistencyCheckJobConfiguration consistencyCheckJobConfig = YamlConsistencyCheckJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
+        ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, JobStatus.FINISHED);
+        jobAPI.persistJobItemProgress(jobItemContext);
+        String referredJobId = consistencyCheckJobConfig.getReferredJobId();
+        log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, referredJobId);
+        JobType jobType = PipelineJobIdUtils.parseJobType(referredJobId);
+        InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
+        Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
+        if (StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
+            dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId);
+        } else {
+            dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), null);

Review Comment:
   The 3rd parameter `algorithmProps` is missed



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+
+/**
+ * Consistency check job API.
+ */
+public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, InventoryIncrementalJobAPI {

Review Comment:
   It should not extends `InventoryIncrementalJobAPI`, `PipelineJobAPI` could be used, maybe plus `PipelineJobItemAPI`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java:
##########
@@ -35,15 +32,31 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
     
     private final String pipelineJobId;
     
-    private final String createTimeMinutes;
+    private final Integer consistencyCheckVersion;

Review Comment:
   `Integer` could be `int`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consistency check job configuration changed processor.
+ */
+@Slf4j
+public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
+    
+    @Override
+    public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
+        String jobId = jobConfigPOJO.getJobName();
+        if (jobConfigPOJO.isDisabled()) {
+            log.info("{} is disabled", jobId);
+            PipelineJobCenter.stop(jobId);
+            return;
+        }
+        switch (eventType) {
+            case ADDED:
+            case UPDATED:
+                if (PipelineJobCenter.isJobExisting(jobId)) {
+                    log.info("{} added to executing jobs failed since it already exists", jobId);
+                } else {
+                    log.info("{} executing jobs", jobId);
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());
+                }
+                break;
+            case DELETED:
+                log.info("deleted consistency check job id: {}", jobId);
+                PipelineJobCenter.stop(jobId);
+                break;
+            default:
+                break;
+        }
+    }
+    
+    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+        ConsistencyCheckJob job = new ConsistencyCheckJob();
+        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+        oneOffJobBootstrap.execute();
+        job.setOneOffJobBootstrap(oneOffJobBootstrap);

Review Comment:
   It's better to put `job.setOneOffJobBootstrap(oneOffJobBootstrap);` before `oneOffJobBootstrap.execute();`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Consistency check job item context.
+ */
+@Getter
+@Setter
+@Slf4j
+public final class ConsistencyCheckJobItemContext implements InventoryIncrementalJobItemContext {

Review Comment:
   It should not implements `InventoryIncrementalJobItemContext`, `PipelineJobItemContext` could be used



##########
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java:
##########
@@ -159,10 +162,20 @@ protected String getJobIdByTableName(final String tableName) {
         return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
-    protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) {
-        List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType));
+    protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) throws SQLException {
+        proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
+        List<Map<String, Object>> checkJobResults = Collections.emptyList();
+        for (int i = 0; i < 10; i++) {
+            checkJobResults = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
+            if (null != checkJobResults && !checkJobResults.isEmpty()) {
+                break;
+            }
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
+        }

Review Comment:
   Is for loop and sleep still needed?



##########
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class ConsistencyCheckJobAPIFactoryTest {
+    
+    @Test
+    public void assertGetInstance() {
+        assertThat(ConsistencyCheckJobAPIFactory.getInstance(), instanceOf(ConsistencyCheckJobAPIImpl.class));
+    }
+}

Review Comment:
   Other API factory need unit test too for ConsistencyCheckJobAPI



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java:
##########
@@ -35,15 +32,31 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
     
     private final String pipelineJobId;
     
-    private final String createTimeMinutes;
+    private final Integer consistencyCheckVersion;

Review Comment:
   And could we rename it to sequence etc?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Create consistency check job parameter.
+ */
+@Data
+@RequiredArgsConstructor
+public final class CreateConsistencyCheckJobParameter {
+    
+    private final String jobId;
+    
+    private final String algorithmTypeName;
+}

Review Comment:
   Seems algorithm properties is missed



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultConfiguration.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+import java.util.Map;
+
+/**
+ * Yaml data consistency check result config.
+ */
+@Getter
+@Setter
+@ToString
+public final class YamlDataConsistencyCheckResultConfiguration {
+    

Review Comment:
   Could we just use `YamlDataConsistencyCheckResult`?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Consistency check job configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@Slf4j
+@ToString
+public final class ConsistencyCheckJobConfiguration implements PipelineJobConfiguration {
+    
+    private final String jobId;
+    
+    private final String referredJobId;
+    
+    private final String algorithmTypeName;
+    
+    @Override
+    public String getSourceDatabaseType() {
+        return null;
+    }

Review Comment:
   It's better to throw UnsupportedOperationException



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobResultConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends AbstractInventoryIncrementalJobAPIImpl implements ConsistencyCheckJobAPI {
+    
+    @Override
+    public JobType getJobType() {
+        return JobType.CONSISTENCY_CHECK;
+    }

Review Comment:
   `getJobType()` could be put to the bottom of class



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api;
+
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+
+import java.util.Map;
+
+/**
+ * Migration job public API.
+ */
+@SingletonSPI
+public interface ConsistencyCheckJobPublicAPI extends InventoryIncrementalJobPublicAPI, RequiredSPI {

Review Comment:
   It should not extends `InventoryIncrementalJobPublicAPI`, `PipelineJobPublicAPI` might be the better one



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consistency check job configuration changed processor.
+ */
+@Slf4j
+public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
+    
+    @Override
+    public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
+        String jobId = jobConfigPOJO.getJobName();
+        if (jobConfigPOJO.isDisabled()) {
+            log.info("{} is disabled", jobId);
+            PipelineJobCenter.stop(jobId);
+            return;
+        }
+        switch (eventType) {
+            case ADDED:
+            case UPDATED:
+                if (PipelineJobCenter.isJobExisting(jobId)) {
+                    log.info("{} added to executing jobs failed since it already exists", jobId);
+                } else {
+                    log.info("{} executing jobs", jobId);
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());

Review Comment:
   `whenComplete` should be added for `CompletableFuture.runAsync`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobResultConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends AbstractInventoryIncrementalJobAPIImpl implements ConsistencyCheckJobAPI {
+    

Review Comment:
   It should not extends `AbstractInventoryIncrementalJobAPIImpl`, `AbstractPipelineJobAPIImpl` could be used



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java:
##########
@@ -35,15 +32,31 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
     
     private final String pipelineJobId;
     
-    private final String createTimeMinutes;
+    private final Integer consistencyCheckVersion;
     
-    public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final long createTimeMillis) {
+    public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final int consistencyCheckVersion) {
         super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
         this.pipelineJobId = pipelineJobId;
-        this.createTimeMinutes = DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(createTimeMillis));
+        if (consistencyCheckVersion > MAX_CONSISTENCY_CHECK_VERSION) {
+            this.consistencyCheckVersion = 0;
+        } else {
+            this.consistencyCheckVersion = consistencyCheckVersion;
+        }
+    }
+    
+    /**
+     * Get consistency check version.
+     *
+     * @param consistencyCheckJobId consistency check job id.
+     * @return consistency check version
+     */
+    public static int getConsistencyCheckVersion(final @NonNull String consistencyCheckJobId) {
+        String versionString = consistencyCheckJobId.substring(consistencyCheckJobId.length() - 1);
+        int version = Integer.parseInt(versionString);
+        return version > MAX_CONSISTENCY_CHECK_VERSION ? 0 : version;

Review Comment:
   It's better not change version any more after it's constructed



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java:
##########
@@ -72,12 +72,12 @@ public void assertGetJobConfigPath() {
     
     @Test
     public void assertGetCheckLatestResultPath() {
-        assertThat(PipelineMetaDataNode.getCheckLatestResultPath(jobId), is(jobCheckRootPath + "/latest_result"));
+        assertThat(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), is(jobCheckRootPath + "/latest_job_id"));
     }
     
     @Test
-    public void assertGetCheckLatestDetailedResultPath() {
-        assertThat(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId), is(jobCheckRootPath + "/latest_detailed_result"));
+    public void assertgetCheckJobResultPath() {
+        assertThat(PipelineMetaDataNode.getCheckJobResultPath(jobId, "j02fx123"), is(jobCheckRootPath + "/job_ids/j02fx123"));
     }

Review Comment:
   These 2 methods name could be updated



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobResultConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends AbstractInventoryIncrementalJobAPIImpl implements ConsistencyCheckJobAPI {
+    
+    @Override
+    public JobType getJobType() {
+        return JobType.CONSISTENCY_CHECK;
+    }

Review Comment:
   And also `getJobClassName()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org