You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/03 08:09:15 UTC

[shardingsphere] branch master updated: Refactor pipeline JobType as SPI (#22598)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 36a943c2286 Refactor pipeline JobType as SPI (#22598)
36a943c2286 is described below

commit 36a943c2286f2f2908fb2ca39ab0a1b5c53cac8f
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Dec 3 16:09:09 2022 +0800

    Refactor pipeline JobType as SPI (#22598)
    
    * Remove lowercaseTypeName in JobType
    
    * Refactor JobType as interface and AbstractJobType
    
    * Refactor JobType as SPI
---
 .../data/pipeline/api/PipelineJobPublicAPI.java    |  2 +-
 .../{JobType.java => type/AbstractJobType.java}    | 35 ++-----------
 .../data/pipeline/spi/job/JobType.java}            | 25 +++++-----
 .../data/pipeline/spi/job/JobTypeFactory.java      | 57 ++++++++++++++++++++++
 .../pipeline/core/api/GovernanceRepositoryAPI.java |  2 +-
 .../data/pipeline/core/api/PipelineAPIFactory.java |  2 +-
 .../core/api/PipelineMetaDataPersistService.java   |  2 +-
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |  6 +--
 .../api/impl/PipelineDataSourcePersistService.java |  2 +-
 ...PipelineProcessConfigurationPersistService.java |  6 +--
 .../pipeline/core/job/AbstractPipelineJobId.java   |  4 +-
 .../data/pipeline/core/job/PipelineJobIdUtils.java |  7 ++-
 .../type/ConsistencyCheckJobType.java}             | 28 +++--------
 .../core/metadata/node/PipelineMetaDataNode.java   |  4 +-
 ...ineChangedJobConfigurationProcessorFactory.java |  2 +-
 ...he.shardingsphere.data.pipeline.spi.job.JobType | 18 +++++++
 .../consistencycheck/ConsistencyCheckJobId.java    |  4 +-
 .../api/impl/ConsistencyCheckJobAPIImpl.java       |  6 ++-
 ...tencyCheckChangedJobConfigurationProcessor.java |  4 +-
 .../task/ConsistencyCheckTasksRunner.java          |  2 +-
 .../scenario/migration/MigrationJobId.java         |  3 +-
 .../scenario/migration/MigrationJobType.java}      | 28 +++--------
 .../migration/api/impl/MigrationJobAPIImpl.java    | 13 +++--
 .../MigrationChangedJobConfigurationProcessor.java |  4 +-
 ...he.shardingsphere.data.pipeline.spi.job.JobType | 18 +++++++
 .../pipeline/core/job/PipelineJobIdUtilsTest.java  |  7 +--
 .../data/pipeline/api/PipelineAPIFactoryTest.java  |  7 +--
 .../api/PipelineJobPublicAPIFactoryTest.java       |  5 +-
 .../core/api/impl/MigrationJobAPIImplTest.java     |  6 +--
 ...lineProcessConfigurationPersistServiceTest.java |  9 ++--
 .../metadata/node/PipelineMetaDataNodeTest.java    |  6 +--
 ...hangedJobConfigurationProcessorFactoryTest.java |  7 +--
 .../core/util/JobConfigurationBuilder.java         |  4 +-
 .../job/JobTypeFactoryTest.java}                   | 21 +++++---
 34 files changed, 205 insertions(+), 151 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index c4e76b72991..b5e4f2cbd2a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 
 import java.util.List;
 
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/type/AbstractJobType.java
similarity index 56%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/type/AbstractJobType.java
index 71724be7cbe..5b87a771edc 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/type/AbstractJobType.java
@@ -15,50 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.api.job.type;
 
 import com.google.common.base.Preconditions;
 import lombok.Getter;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.stream.Collectors;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 
 /**
- * Job type.
+ * Abstract job type.
  */
 @Getter
-public enum JobType {
-    
-    MIGRATION("MIGRATION", "01"),
-    CONSISTENCY_CHECK("CONSISTENCY_CHECK", "02");
-    
-    private static final Map<String, JobType> CODE_JOB_TYPE_MAP;
-    
-    static {
-        CODE_JOB_TYPE_MAP = Arrays.stream(JobType.values()).collect(Collectors.toMap(JobType::getTypeCode, each -> each));
-    }
+public abstract class AbstractJobType implements JobType {
     
     private final String typeName;
     
-    private final String lowercaseTypeName;
-    
     private final String typeCode;
     
-    JobType(final String typeName, final String typeCode) {
+    public AbstractJobType(final String typeName, final String typeCode) {
         this.typeName = typeName;
-        lowercaseTypeName = typeName.toLowerCase();
         Preconditions.checkArgument(2 == typeCode.length(), "code length is not 2");
         this.typeCode = typeCode;
     }
-    
-    /**
-     * Value of by code.
-     *
-     * @param typeCode type code
-     * @return job type, might be null
-     */
-    public static JobType valueOfByCode(final String typeCode) {
-        return CODE_JOB_TYPE_MAP.get(typeCode);
-    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobType.java
similarity index 62%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobType.java
index 517fbe08fed..b80ae2aa106 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobType.java
@@ -15,30 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.spi.job;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 
 /**
- * Pipeline meta data persist service.
- *
- * @param <T> type of configuration
+ * Job type.
  */
-public interface PipelineMetaDataPersistService<T> {
+@SingletonSPI
+public interface JobType {
     
     /**
-     * Load meta data.
+     * Get type name.
      *
-     * @param jobType job type, nullable
-     * @return configurations
+     * @return type name
      */
-    T load(JobType jobType);
+    String getTypeName();
     
     /**
-     * Persist meta data.
+     * Get type code.
      *
-     * @param jobType job type, nullable
-     * @param configs configurations
+     * @return type code
      */
-    void persist(JobType jobType, T configs);
+    String getTypeCode();
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactory.java
new file mode 100644
index 00000000000..468e22f5d33
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.job;
+
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Job type factory.
+ */
+@Slf4j
+public final class JobTypeFactory {
+    
+    private static final Map<String, JobType> CODE_JOB_TYPE_MAP = new ConcurrentHashMap<>();
+    
+    static {
+        ShardingSphereServiceLoader.register(JobType.class);
+        for (JobType each : ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
+            String typeCode = each.getTypeCode();
+            JobType replaced = CODE_JOB_TYPE_MAP.put(typeCode, each);
+            if (replaced != null) {
+                log.error("Type code already exists, typeCode={}, replaced={}, current={}", typeCode, replaced, each, new Exception());
+            }
+        }
+    }
+    
+    /**
+     * Get job type instance.
+     *
+     * @param jobTypeCode job type code
+     * @return job type
+     */
+    public static JobType getInstance(final String jobTypeCode) {
+        JobType result = CODE_JOB_TYPE_MAP.get(jobTypeCode);
+        Preconditions.checkNotNull(result, "Can not get job type by `%s`", jobTypeCode);
+        return result;
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index f910062508b..75fb805a811 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.api;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 import java.util.Collection;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index 52972546a8d..1fe434122bb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -23,11 +23,11 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.registry.CoordinatorRegistryCenterInitializer;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
index 517fbe08fed..18dd0e68a9d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 
 /**
  * Pipeline meta data persist service.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index d150ccf2811..37c27a7942e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -21,11 +21,11 @@ import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResultSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.yaml.consistency.YamlDataConsistencyCheckResultSwapper;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
index ae457b2ddd4..d7132d74318 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import com.google.common.base.Strings;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
index b16c95fa6c7..4d7f932bd3e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -19,11 +19,11 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import com.google.common.base.Strings;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 /**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
index 762eb3eeb79..96a75df0559 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
@@ -19,15 +19,13 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import com.google.common.base.Preconditions;
 import lombok.Getter;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 
 /**
  * Abstract pipeline job id.
  */
 @Getter
-@ToString
 public abstract class AbstractPipelineJobId implements PipelineJobId {
     
     private final JobType jobType;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index e6773a51cbb..5b6ddcc0a80 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -20,8 +20,9 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import com.google.common.base.Preconditions;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
 
 /**
  * Pipeline job id utils.
@@ -49,8 +50,6 @@ public final class PipelineJobIdUtils {
         Preconditions.checkArgument(jobId.length() > 3, "Invalid jobId length, jobId=%s", jobId);
         Preconditions.checkArgument('j' == jobId.charAt(0), "Invalid jobId, first char=%s", jobId.charAt(0));
         String typeCode = jobId.substring(1, 3);
-        JobType result = JobType.valueOfByCode(typeCode);
-        Preconditions.checkNotNull(result, "Can not get job type by `%s`, job ID is `%s`", typeCode, jobId);
-        return result;
+        return JobTypeFactory.getInstance(typeCode);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/ConsistencyCheckJobType.java
similarity index 59%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
copy to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/ConsistencyCheckJobType.java
index 517fbe08fed..6f285e34aea 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/ConsistencyCheckJobType.java
@@ -15,30 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.core.job.type;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.type.AbstractJobType;
 
 /**
- * Pipeline meta data persist service.
- *
- * @param <T> type of configuration
+ * Consistency check job type.
  */
-public interface PipelineMetaDataPersistService<T> {
+public final class ConsistencyCheckJobType extends AbstractJobType {
     
-    /**
-     * Load meta data.
-     *
-     * @param jobType job type, nullable
-     * @return configurations
-     */
-    T load(JobType jobType);
+    public static final String TYPE_CODE = "02";
     
-    /**
-     * Persist meta data.
-     *
-     * @param jobType job type, nullable
-     * @param configs configurations
-     */
-    void persist(JobType jobType, T configs);
+    public ConsistencyCheckJobType() {
+        super("CONSISTENCY_CHECK", TYPE_CODE);
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 33e3eb7bcdb..14659824fc6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.node;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 
 import java.util.regex.Pattern;
 
@@ -49,7 +49,7 @@ public final class PipelineMetaDataNode {
     private static String getMetaDataRootPath(final JobType jobType) {
         return null == jobType
                 ? String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, "metadata")
-                : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobType.getLowercaseTypeName(), "metadata");
+                : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobType.getTypeName().toLowerCase(), "metadata");
     }
     
     /**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
index 71a5a4febc6..98e7d85aa87 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
new file mode 100644
index 00000000000..51dc1de8e14
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 5f5158f29b0..085917dd0aa 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import lombok.Getter;
 import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
 
 /**
@@ -45,7 +45,7 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     }
     
     public ConsistencyCheckJobId(final String parentJobId, final int sequence) {
-        super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
+        super(new ConsistencyCheckJobType(), CURRENT_VERSION);
         this.parentJobId = parentJobId;
         this.sequence = sequence > ConsistencyCheckSequence.MAX_SEQUENCE ? ConsistencyCheckSequence.MIN_SEQUENCE : sequence;
     }
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
index 0cbfb235968..02982d33aef 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
@@ -28,7 +28,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProces
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
@@ -45,11 +44,14 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -278,6 +280,6 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     @Override
     public JobType getJobType() {
-        return JobType.CONSISTENCY_CHECK;
+        return JobTypeFactory.getInstance(ConsistencyCheckJobType.TYPE_CODE);
     }
 }
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index 77dd7c346b7..d4c9978ebfd 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -18,10 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -81,6 +81,6 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
     
     @Override
     public String getType() {
-        return JobType.CONSISTENCY_CHECK.getTypeName();
+        return new ConsistencyCheckJobType().getTypeName();
     }
 }
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index b8e3f417e7e..3f6752b9a06 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfigu
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -38,6 +37,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.Con
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 
 import java.sql.SQLException;
 import java.util.Map;
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index c93e338e300..15cc3991430 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
 
 /**
@@ -44,7 +43,7 @@ public final class MigrationJobId extends AbstractPipelineJobId {
     
     public MigrationJobId(@NonNull final String sourceResourceName, final String sourceSchemaName, @NonNull final String sourceTableName,
                           @NonNull final String targetDatabaseName, @NonNull final String targetTableName) {
-        super(JobType.MIGRATION, CURRENT_VERSION);
+        super(new MigrationJobType(), CURRENT_VERSION);
         this.sourceResourceName = sourceResourceName;
         this.sourceSchemaName = sourceSchemaName;
         this.sourceTableName = sourceTableName;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
similarity index 59%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
copy to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 517fbe08fed..7fd75061041 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -15,30 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.api;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.type.AbstractJobType;
 
 /**
- * Pipeline meta data persist service.
- *
- * @param <T> type of configuration
+ * Migration job type.
  */
-public interface PipelineMetaDataPersistService<T> {
+public final class MigrationJobType extends AbstractJobType {
     
-    /**
-     * Load meta data.
-     *
-     * @param jobType job type, nullable
-     * @return configurations
-     */
-    T load(JobType jobType);
+    public static final String TYPE_CODE = "01";
     
-    /**
-     * Persist meta data.
-     *
-     * @param jobType job type, nullable
-     * @param configs configurations
-     */
-    void persist(JobType jobType, T configs);
+    public MigrationJobType() {
+        super("MIGRATION", TYPE_CODE);
+    }
 }
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
index e4c6a8d4b0d..a04dd264549 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
@@ -40,7 +40,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
@@ -59,16 +58,20 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
 import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtil;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
@@ -256,7 +259,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         super.startDisabledJob(jobId);
         PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
             try {
-                PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK).startDisabledJob(optional);
+                PipelineAPIFactory.getPipelineJobAPI(new ConsistencyCheckJobType()).startDisabledJob(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -269,7 +272,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     public void stop(final String jobId) {
         PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
             try {
-                PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK).stop(optional);
+                PipelineAPIFactory.getPipelineJobAPI(new ConsistencyCheckJobType()).stop(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -399,7 +402,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     @Override
     public String createJobAndStart(final CreateMigrationJobParameter param) {
         YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
-        Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(JobType.MIGRATION);
+        Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(new MigrationJobType());
         Map<String, Object> sourceDataSourceProps = swapper.swapToMap(metaDataDataSource.get(param.getSourceResourceName()));
         YamlPipelineDataSourceConfiguration sourcePipelineDataSourceConfig = createYamlPipelineDataSourceConfiguration(
                 StandardPipelineDataSourceConfiguration.TYPE, YamlEngine.marshal(sourceDataSourceProps));
@@ -458,7 +461,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     
     @Override
     public JobType getJobType() {
-        return JobType.MIGRATION;
+        return JobTypeFactory.getInstance(MigrationJobType.TYPE_CODE);
     }
     
     @Override
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index 9f485f4365d..83d6a791561 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -18,13 +18,13 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
@@ -86,6 +86,6 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
     
     @Override
     public String getType() {
-        return JobType.MIGRATION.getTypeName();
+        return new MigrationJobType().getTypeName();
     }
 }
diff --git a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
new file mode 100644
index 00000000000..c1ca2ef098d
--- /dev/null
+++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.job.JobType
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType
diff --git a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 98ccd8398d2..2a1d6e902a7 100644
--- a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 public final class PipelineJobIdUtilsTest {
@@ -31,6 +32,6 @@ public final class PipelineJobIdUtilsTest {
         MigrationJobId pipelineJobId = new MigrationJobId("ds_0", null, "t_order", "sharding_db", "t_order");
         String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
         JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
-        assertThat(actualJobType, is(JobType.MIGRATION));
+        assertThat(actualJobType, instanceOf(MigrationJobType.class));
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
index 613d82a122b..5274d5e9364 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl;
 import org.junit.Test;
 
@@ -30,11 +31,11 @@ public final class PipelineAPIFactoryTest {
     
     @Test
     public void assertGetPipelineJobAPI() {
-        assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), instanceOf(MigrationJobAPIImpl.class));
+        assertThat(PipelineAPIFactory.getPipelineJobAPI(new MigrationJobType()), instanceOf(MigrationJobAPIImpl.class));
     }
     
     @Test
     public void assertGetConsistencyCheckJobAPI() {
-        assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckJobAPIImpl.class));
+        assertThat(PipelineAPIFactory.getPipelineJobAPI(new ConsistencyCheckJobType()), instanceOf(ConsistencyCheckJobAPIImpl.class));
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index f4859f279a0..8810d1f5ee0 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -18,9 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.junit.Test;
 
 import java.util.Collection;
@@ -34,7 +35,7 @@ public final class PipelineJobPublicAPIFactoryTest {
     @Test
     public void assertGetInventoryIncrementalJobPublicAPI() {
         Collection<Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>>> paramResult = new LinkedList<>();
-        paramResult.add(Pair.of(JobType.MIGRATION, MigrationJobAPIImpl.class));
+        paramResult.add(Pair.of(new MigrationJobType(), MigrationJobAPIImpl.class));
         for (Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>> each : paramResult) {
             assertThat(PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
         }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index d64ff38dd71..69b5965350c 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfig
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
@@ -35,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineD
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -243,7 +243,7 @@ public final class MigrationJobAPIImplTest {
     @Test
     public void assertAddMigrationSourceResources() {
         PipelineDataSourcePersistService persistService = new PipelineDataSourcePersistService();
-        Map<String, DataSourceProperties> actual = persistService.load(JobType.MIGRATION);
+        Map<String, DataSourceProperties> actual = persistService.load(new MigrationJobType());
         assertTrue(actual.containsKey("ds_0"));
     }
     
@@ -259,7 +259,7 @@ public final class MigrationJobAPIImplTest {
     }
     
     private void initIntPrimaryEnvironment() throws SQLException {
-        Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(JobType.MIGRATION);
+        Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(new MigrationJobType());
         DataSourceProperties dataSourceProps = metaDataDataSource.get("ds_0");
         DataSource dataSource = DataSourcePoolCreator.create(dataSourceProps);
         try (
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
index dddc2c8ce9f..5c7f50a7e66 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
@@ -18,13 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineReadConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineWriteConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
 import org.junit.BeforeClass;
@@ -56,7 +57,7 @@ public final class PipelineProcessConfigurationPersistServiceTest {
         String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
         PipelineProcessConfiguration processConfig = new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
         PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService();
-        JobType jobType = JobType.MIGRATION;
+        JobType jobType = new MigrationJobType();
         persistService.persist(jobType, processConfig);
         String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(jobType)));
         assertThat(actualYamlText, is(expectedYamlText));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
similarity index 93%
rename from kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
index 03c30a92b99..fc025e94fa1 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.node;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -37,12 +37,12 @@ public final class PipelineMetaDataNodeTest {
     
     @Test
     public void assertGetMetaDataDataSourcesPath() {
-        assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath(JobType.MIGRATION), is(migrationMetaDataRootPath + "/data_sources"));
+        assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath(new MigrationJobType()), is(migrationMetaDataRootPath + "/data_sources"));
     }
     
     @Test
     public void assertGetMetaDataProcessConfigPath() {
-        assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath(JobType.MIGRATION), is(migrationMetaDataRootPath + "/process_config"));
+        assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath(new MigrationJobType()), is(migrationMetaDataRootPath + "/process_config"));
     }
     
     @Test
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
index e9c2fa7af81..e60fb222d2c 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
@@ -17,8 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor;
 import org.junit.Test;
 
@@ -29,7 +30,7 @@ public final class PipelineChangedJobConfigurationProcessorFactoryTest {
     
     @Test
     public void assertGetInstance() {
-        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.MIGRATION), instanceOf(MigrationChangedJobConfigurationProcessor.class));
-        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
+        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new MigrationJobType()), instanceOf(MigrationChangedJobConfigurationProcessor.class));
+        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new ConsistencyCheckJobType()), instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 0317b95a374..85defe6b77b 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -25,9 +25,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -55,7 +55,7 @@ public final class JobConfigurationBuilder {
         result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
                 ConfigurationFileUtil.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
         result.setUniqueKeyColumn(createYamlPipelineColumnMetaData());
-        PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
+        PipelineAPIFactory.getPipelineJobAPI(new MigrationJobType()).extendYamlJobConfiguration(result);
         return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
     }
     
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactoryTest.java
similarity index 58%
copy from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
copy to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactoryTest.java
index e9c2fa7af81..3dc726ff68d 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/job/JobTypeFactoryTest.java
@@ -15,21 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
+package org.apache.shardingsphere.data.pipeline.spi.job;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-public final class PipelineChangedJobConfigurationProcessorFactoryTest {
+public final class JobTypeFactoryTest {
     
     @Test
     public void assertGetInstance() {
-        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.MIGRATION), instanceOf(MigrationChangedJobConfigurationProcessor.class));
-        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
+        Collection<Pair<String, Class<? extends JobType>>> paramResult = Arrays.asList(
+                Pair.of(MigrationJobType.TYPE_CODE, MigrationJobType.class), Pair.of(ConsistencyCheckJobType.TYPE_CODE, ConsistencyCheckJobType.class));
+        for (Pair<String, Class<? extends JobType>> each : paramResult) {
+            JobType actual = JobTypeFactory.getInstance(each.getKey());
+            assertThat(actual, instanceOf(each.getValue()));
+        }
     }
 }