You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/09/21 09:14:12 UTC

[shardingsphere] branch master updated: Extract InventoryIncrementalJobPublicAPI for common usage (#21111)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fa653d5da23 Extract InventoryIncrementalJobPublicAPI for common usage (#21111)
fa653d5da23 is described below

commit fa653d5da237768c2972da0c4078c2524bc0f27e
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Sep 21 17:13:53 2022 +0800

    Extract InventoryIncrementalJobPublicAPI for common usage (#21111)
    
    * Move commit and rollback from PipelineJobPublicAPI to MigrationJobPublicAPI
    
    * Add InventoryIncrementalJobPublicAPI, move PipelineJobPublicAPI processConfig apis
    
    * Move commit and rollback to InventoryIncrementalJobPublicAPI
    
    * Throw SQLException for commit
    
    * Fix compile error
---
 .../handler/update/CommitMigrationUpdater.java     |  8 ++-
 ....java => InventoryIncrementalJobPublicAPI.java} | 30 ++-------
 .../data/pipeline/api/MigrationJobPublicAPI.java   |  2 +-
 .../data/pipeline/api/PipelineJobPublicAPI.java    | 48 ++------------
 .../pipeline/api/PipelineJobPublicAPIFactory.java  |  8 +--
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 71 +--------------------
 .../impl/InventoryIncrementalJobPublicAPIImpl.java | 74 ++++++++++++++++++++++
 .../scenario/migration/MigrationJobAPIImpl.java    | 23 +++++--
 ....pipeline.api.InventoryIncrementalJobPublicAPI} |  0
 ...toryIncrementalProcessConfigurationUpdater.java |  4 +-
 ...toryIncrementalProcessConfigurationUpdater.java |  4 +-
 .../DropPipelineProcessConfigurationUpdater.java   |  4 +-
 .../api/PipelineJobPublicAPIFactoryTest.java       |  8 +--
 .../core/api/impl/MigrationJobAPIImplTest.java     |  2 +-
 14 files changed, 126 insertions(+), 160 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
index 7f1544e10fd..5d1370a629c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
@@ -22,6 +22,8 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
 
+import java.sql.SQLException;
+
 /**
  * Commit migration updater.
  */
@@ -31,7 +33,11 @@ public final class CommitMigrationUpdater implements RALUpdater<CommitMigrationS
     
     @Override
     public void executeUpdate(final String databaseName, final CommitMigrationStatement sqlStatement) {
-        JOB_API.commit(sqlStatement.getJobId());
+        try {
+            JOB_API.commit(sqlStatement.getJobId());
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
+        }
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
similarity index 78%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 2898ade8cc1..53b89e72f35 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -18,16 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 import java.sql.SQLException;
-import java.util.List;
 
 /**
- * Pipeline job public API.
+ * Inventory incremental job API.
  */
-public interface PipelineJobPublicAPI extends TypedSPI {
+public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI, TypedSPI {
     
     /**
      * Create process configuration.
@@ -57,20 +55,6 @@ public interface PipelineJobPublicAPI extends TypedSPI {
      */
     PipelineProcessConfiguration showProcessConfiguration();
     
-    /**
-     * Start disabled job.
-     *
-     * @param jobId job id
-     */
-    void startDisabledJob(String jobId);
-    
-    /**
-     * Stop pipeline job.
-     *
-     * @param jobId job id
-     */
-    void stop(String jobId);
-    
     /**
      * Rollback pipeline job.
      *
@@ -83,13 +67,7 @@ public interface PipelineJobPublicAPI extends TypedSPI {
      * Commit pipeline job.
      *
      * @param jobId job id
+     * @throws SQLException when commit underlying database data
      */
-    void commit(String jobId);
-    
-    /**
-     * Get pipeline job info.
-     *
-     * @return jobInfos
-     */
-    List<? extends PipelineJobInfo> list();
+    void commit(String jobId) throws SQLException;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index 2ebc30b0921..e60b48bfe92 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -35,7 +35,7 @@ import java.util.Properties;
  * Migration job public API.
  */
 @SingletonSPI
-public interface MigrationJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI {
+public interface MigrationJobPublicAPI extends InventoryIncrementalJobPublicAPI, RequiredSPI {
     
     /**
      * List all jobs.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index 2898ade8cc1..1b635825c7e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -17,45 +17,22 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
-import java.sql.SQLException;
 import java.util.List;
 
 /**
  * Pipeline job public API.
  */
-public interface PipelineJobPublicAPI extends TypedSPI {
+public interface PipelineJobPublicAPI {
     
     /**
-     * Create process configuration.
+     * Get job type.
      *
-     * @param processConfig process configuration
+     * @return job type
      */
-    void createProcessConfiguration(PipelineProcessConfiguration processConfig);
-    
-    /**
-     * Alter process configuration.
-     *
-     * @param processConfig process configuration
-     */
-    void alterProcessConfiguration(PipelineProcessConfiguration processConfig);
-    
-    /**
-     * Drop process configuration.
-     *
-     * @param confPath configuration path. e.g. <code>/</code>, <code>/READ</code>, <code>/READ/RATE_LIMITER</code>
-     */
-    void dropProcessConfiguration(String confPath);
-    
-    /**
-     * Show process configuration.
-     *
-     * @return process configuration, non-null
-     */
-    PipelineProcessConfiguration showProcessConfiguration();
+    JobType getJobType();
     
     /**
      * Start disabled job.
@@ -71,21 +48,6 @@ public interface PipelineJobPublicAPI extends TypedSPI {
      */
     void stop(String jobId);
     
-    /**
-     * Rollback pipeline job.
-     *
-     * @param jobId job id
-     * @throws SQLException when rollback underlying database data
-     */
-    void rollback(String jobId) throws SQLException;
-    
-    /**
-     * Commit pipeline job.
-     *
-     * @param jobId job id
-     */
-    void commit(String jobId);
-    
     /**
      * Get pipeline job info.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 4443bbc6db1..a6ab527ac32 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -28,18 +28,18 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 public final class PipelineJobPublicAPIFactory {
     
     static {
-        ShardingSphereServiceLoader.register(PipelineJobPublicAPI.class);
+        ShardingSphereServiceLoader.register(InventoryIncrementalJobPublicAPI.class);
         ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
     }
     
     /**
-     * Get instance of pipeline job public API.
+     * Get instance of inventory incremental job public API.
      *
      * @param jobTypeName job type name
      * @return got instance
      */
-    public static PipelineJobPublicAPI getPipelineJobPublicAPI(@NonNull final String jobTypeName) {
-        return TypedSPIRegistry.getRegisteredService(PipelineJobPublicAPI.class, jobTypeName);
+    public static InventoryIncrementalJobPublicAPI getInventoryIncrementalJobPublicAPI(@NonNull final String jobTypeName) {
+        return TypedSPIRegistry.getRegisteredService(InventoryIncrementalJobPublicAPI.class, jobTypeName);
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3d5ce269185..235ff348eac 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -23,22 +23,15 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
@@ -49,7 +42,6 @@ import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
-import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.List;
@@ -66,51 +58,8 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
-    private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
-    
-    private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
-    
     private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
     
-    protected abstract JobType getJobType();
-    
-    @Override
-    public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
-        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
-        ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
-        processConfigPersistService.persist(getJobType(), processConfig);
-    }
-    
-    @Override
-    public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
-        // TODO check rateLimiter type match or not
-        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
-        targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
-        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
-    }
-    
-    private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
-        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
-        ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
-        return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
-    }
-    
-    @Override
-    public void dropProcessConfiguration(final String confPath) {
-        String finalConfPath = confPath.trim();
-        PipelineProcessConfigurationUtil.verifyConfPath(confPath);
-        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
-        PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
-        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
-    }
-    
-    @Override
-    public PipelineProcessConfiguration showProcessConfiguration() {
-        PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
-        result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
-        return result;
-    }
-    
     @Override
     public final String marshalJobId(final PipelineJobId pipelineJobId) {
         return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + marshalJobIdLeftPart(pipelineJobId);
@@ -200,29 +149,11 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
     }
     
-    @Override
-    public void rollback(final String jobId) throws SQLException {
-        log.info("Rollback job {}", jobId);
-        stop(jobId);
-        cleanTempTableOnRollback(jobId);
-        dropJob(jobId);
-    }
-    
-    protected abstract void cleanTempTableOnRollback(String jobId) throws SQLException;
-    
-    private void dropJob(final String jobId) {
+    protected void dropJob(final String jobId) {
         PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId), null);
         PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
     }
     
-    @Override
-    public void commit(final String jobId) {
-        checkModeConfig();
-        log.info("Commit job {}", jobId);
-        stop(jobId);
-        dropJob(jobId);
-    }
-    
     protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
         JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
         Preconditions.checkNotNull(result, new PipelineJobNotFoundException(jobId));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
new file mode 100644
index 00000000000..44f48883545
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+/**
+ * Inventory incremental job API implementation.
+ */
+public abstract class InventoryIncrementalJobPublicAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobPublicAPI {
+    
+    private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+    
+    private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
+    
+    @Override
+    public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+        ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
+        processConfigPersistService.persist(getJobType(), processConfig);
+    }
+    
+    @Override
+    public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+        // TODO check rateLimiter type match or not
+        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+        targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+    }
+    
+    private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
+        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+        ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
+        return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+    }
+    
+    @Override
+    public void dropProcessConfiguration(final String confPath) {
+        String finalConfPath = confPath.trim();
+        PipelineProcessConfigurationUtil.verifyConfPath(confPath);
+        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+        PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
+        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+    }
+    
+    @Override
+    public PipelineProcessConfiguration showProcessConfiguration() {
+        PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
+        result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
+        return result;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 560379526fa..27ef7c8f539 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -59,8 +59,8 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobPublicAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
@@ -116,7 +116,7 @@ import java.util.stream.IntStream;
  * Migration job API impl.
  */
 @Slf4j
-public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implements MigrationJobAPI {
+public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPIImpl implements MigrationJobAPI {
     
     private static final YamlRuleConfigurationSwapperEngine RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
     
@@ -127,7 +127,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
     
     @Override
-    protected JobType getJobType() {
+    public JobType getJobType() {
         return JobType.MIGRATION;
     }
     
@@ -356,7 +356,14 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     }
     
     @Override
-    protected void cleanTempTableOnRollback(final String jobId) throws SQLException {
+    public void rollback(final String jobId) throws SQLException {
+        log.info("Rollback job {}", jobId);
+        stop(jobId);
+        cleanTempTableOnRollback(jobId);
+        dropJob(jobId);
+    }
+    
+    private void cleanTempTableOnRollback(final String jobId) throws SQLException {
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
         String targetTableName = jobConfig.getTargetTableName();
         // TODO use jobConfig.targetSchemaName
@@ -373,6 +380,14 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
         }
     }
     
+    @Override
+    public void commit(final String jobId) throws SQLException {
+        checkModeConfig();
+        log.info("Commit job {}", jobId);
+        stop(jobId);
+        dropJob(jobId);
+    }
+    
     @Override
     public void addMigrationSourceResources(final Map<String, DataSourceProperties> dataSourcePropsMap) {
         log.info("Add migration source resources {}", dataSourcePropsMap.keySet());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
index 8c8e405a871..cdbe647657c 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
 
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.AlterInventoryIncrementalProcessConfigurationStatement;
@@ -31,7 +31,7 @@ public final class AlterInventoryIncrementalProcessConfigurationUpdater implemen
     
     @Override
     public void executeUpdate(final String databaseName, final AlterInventoryIncrementalProcessConfigurationStatement sqlStatement) {
-        PipelineJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+        InventoryIncrementalJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
         PipelineProcessConfiguration processConfig = InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
         jobAPI.alterProcessConfiguration(processConfig);
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
index 97454efb0ba..2fab9c8c6c3 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
 
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.CreateInventoryIncrementalProcessConfigurationStatement;
@@ -31,7 +31,7 @@ public final class CreateInventoryIncrementalProcessConfigurationUpdater impleme
     
     @Override
     public void executeUpdate(final String databaseName, final CreateInventoryIncrementalProcessConfigurationStatement sqlStatement) {
-        PipelineJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+        InventoryIncrementalJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
         PipelineProcessConfiguration processConfig = InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
         jobAPI.createProcessConfiguration(processConfig);
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
index 6d67482be2a..d550b456268 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
 
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.DropPipelineProcessConfigurationStatement;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
@@ -29,7 +29,7 @@ public final class DropPipelineProcessConfigurationUpdater implements RALUpdater
     
     @Override
     public void executeUpdate(final String databaseName, final DropPipelineProcessConfigurationStatement sqlStatement) {
-        PipelineJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+        InventoryIncrementalJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
         jobAPI.dropProcessConfiguration(sqlStatement.getConfPath());
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index fb2eca869b8..912a9ebf2c2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -31,11 +31,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public final class PipelineJobPublicAPIFactoryTest {
     
     @Test
-    public void assertGetPipelineJobPublicAPI() {
-        Collection<Pair<JobType, Class<? extends PipelineJobPublicAPI>>> paramResult = new LinkedList<>();
+    public void assertGetInventoryIncrementalJobPublicAPI() {
+        Collection<Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>>> paramResult = new LinkedList<>();
         paramResult.add(Pair.of(JobType.MIGRATION, MigrationJobAPIImpl.class));
-        for (Pair<JobType, Class<? extends PipelineJobPublicAPI>> each : paramResult) {
-            assertThat(PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
+        for (Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>> each : paramResult) {
+            assertThat(PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
         }
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 41d85c69570..9770d830787 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -135,7 +135,7 @@ public final class MigrationJobAPIImplTest {
     }
     
     @Test
-    public void assertCommit() {
+    public void assertCommit() throws SQLException {
         Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
         MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());