You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/09/15 02:28:28 UTC

[shardingsphere] branch master updated: Adjust pipeline SPI class name and package name; Revise javadoc (#20983)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 943f5ee5064 Adjust pipeline SPI class name and package name; Revise javadoc (#20983)
943f5ee5064 is described below

commit 943f5ee50642170f33aa718da7258462d170ff07
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Thu Sep 15 10:28:17 2022 +0800

    Adjust pipeline SPI class name and package name; Revise javadoc (#20983)
    
    * Rename and move metadata changed event handler
    
    * Move rate limiter impl
    
    * Clean unused PipelineSchemaTableUtil.getSchemaTablesMapFromActual method
    
    * Improve PipelineJobPublicAPIFactoryTest
    
    * Use drop table SQL for rollback
    
    * Improve log and javadoc
---
 .../pipeline/api/PipelineJobPublicAPIFactory.java  |  2 +-
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |  6 ++--
 .../metadata/node/PipelineMetaDataNodeWatcher.java | 18 +++++-----
 .../PipelineChangedJobConfigurationProcessor.java} | 14 ++++----
 ...neChangedJobConfigurationProcessorFactory.java} | 16 ++++-----
 .../PipelineMetaDataChangedEventHandler.java}      |  8 ++---
 ...ipelineMetaDataChangedEventHandlerFactory.java} | 16 ++++-----
 .../impl/BarrierMetaDataChangedEventHandler.java}  |  7 ++--
 .../impl/ChangedJobConfigurationDispatcher.java}   | 19 +++++-----
 .../ratelimit/QPSJobRateLimitAlgorithm.java        |  2 +-
 .../ratelimit/TPSJobRateLimitAlgorithm.java        |  2 +-
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |  4 +--
 .../core/util/PipelineSchemaTableUtil.java         | 42 ++--------------------
 .../core/util/PipelineTableMetaDataUtil.java       |  9 +++--
 ...MigrationChangedJobConfigurationProcessor.java} | 22 ++++++------
 .../scenario/migration/MigrationJobAPIImpl.java    |  2 +-
 ...ndler.PipelineChangedJobConfigurationProcessor} |  2 +-
 ...nt.handler.PipelineMetaDataChangedEventHandler} |  4 +--
 ...ta.pipeline.spi.ratelimit.JobRateLimitAlgorithm |  4 +--
 .../fixture/FixturePipelineSQLBuilder.java         |  2 +-
 ...ineMetaDataChangedEventHandlerFactoryTest.java} | 14 ++++----
 .../src/main/resources/logback.xml                 |  2 ++
 .../api/PipelineJobPublicAPIFactoryTest.java       | 14 ++++++++
 23 files changed, 109 insertions(+), 122 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 958d6943872..4443bbc6db1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -28,8 +28,8 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 public final class PipelineJobPublicAPIFactory {
     
     static {
-        ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
         ShardingSphereServiceLoader.register(PipelineJobPublicAPI.class);
+        ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 1c88196bab9..ad2f8365d7e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -98,13 +98,13 @@ public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
     String buildDeleteSQL(String schemaName, DataRecord dataRecord, Collection<Column> conditionColumns);
     
     /**
-     * Build truncate SQL.
+     * Build drop SQL.
      *
      * @param schemaName schema name
      * @param tableName table name
-     * @return truncate SQL
+     * @return drop SQL
      */
-    String buildTruncateSQL(String schemaName, String tableName);
+    String buildDropSQL(String schemaName, String tableName);
     
     /**
      * Build count SQL.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
index 18dec825008..70326d46e58 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.node;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler;
-import org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandlerFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandlerFactory;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
 import java.util.Collection;
@@ -31,25 +31,25 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
 /**
- * Pipeline metaData nodeWatcher.
+ * Pipeline meta data node watcher.
  */
 @Slf4j
 public final class PipelineMetaDataNodeWatcher {
     
     private static final PipelineMetaDataNodeWatcher INSTANCE = new PipelineMetaDataNodeWatcher();
     
-    private final Map<Pattern, PipelineMetaDataChangedHandler> listenerMap = new ConcurrentHashMap<>();
+    private final Map<Pattern, PipelineMetaDataChangedEventHandler> listenerMap = new ConcurrentHashMap<>();
     
     private PipelineMetaDataNodeWatcher() {
-        Collection<PipelineMetaDataChangedHandler> instances = PipelineMetaDataChangedHandlerFactory.findAllInstances();
-        for (PipelineMetaDataChangedHandler each : instances) {
+        Collection<PipelineMetaDataChangedEventHandler> instances = PipelineMetaDataChangedEventHandlerFactory.findAllInstances();
+        for (PipelineMetaDataChangedEventHandler each : instances) {
             listenerMap.put(each.getKeyPattern(), each);
         }
         PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT, this::dispatchEvent);
     }
     
     private void dispatchEvent(final DataChangedEvent event) {
-        for (Entry<Pattern, PipelineMetaDataChangedHandler> entry : listenerMap.entrySet()) {
+        for (Entry<Pattern, PipelineMetaDataChangedEventHandler> entry : listenerMap.entrySet()) {
             if (entry.getKey().matcher(event.getKey()).matches()) {
                 entry.getValue().handle(event);
                 return;
@@ -58,9 +58,9 @@ public final class PipelineMetaDataNodeWatcher {
     }
     
     /**
-     * Get pipeline metaData nodeWatcher instance.
+     * Get instance.
      *
-     * @return pipeline metaData nodeWatcher
+     * @return instance
      */
     public static PipelineMetaDataNodeWatcher getInstance() {
         return INSTANCE;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
similarity index 71%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
index 02ef7febb6f..ea8edfdbb57 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.process;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
 /**
- * Pipeline job config changed event processor.
+ * Pipeline changed job configuration processor.
  */
-public interface PipelineJobConfigurationChangedEventProcessor extends TypedSPI {
+public interface PipelineChangedJobConfigurationProcessor extends TypedSPI {
     
     /**
-     * Process data changed event.
+     * Process changed job configuration.
      *
-     * @param jobConfigPOJO job config
-     * @param event event
+     * @param eventType event type
+     * @param jobConfigPOJO job configuration pojo
      */
-    void process(DataChangedEvent event, JobConfigurationPOJO jobConfigPOJO);
+    void process(DataChangedEvent.Type eventType, JobConfigurationPOJO jobConfigPOJO);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
similarity index 66%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
index 2ae73d39051..71a5a4febc6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.process;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -24,22 +24,22 @@ import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
 /**
- * Pipeline job config changed event processor factory.
+ * Pipeline changed job configuration processor factory.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineJobConfigurationChangedEventProcessorFactory {
+public final class PipelineChangedJobConfigurationProcessorFactory {
     
     static {
-        ShardingSphereServiceLoader.register(PipelineJobConfigurationChangedEventProcessor.class);
+        ShardingSphereServiceLoader.register(PipelineChangedJobConfigurationProcessor.class);
     }
     
     /**
-     * Get job changed event processor instance.
+     * Get instance.
      *
      * @param jobType job type
-     * @return pipeline event process
+     * @return instance
      */
-    public static PipelineJobConfigurationChangedEventProcessor getInstance(final JobType jobType) {
-        return TypedSPIRegistry.getRegisteredService(PipelineJobConfigurationChangedEventProcessor.class, jobType.getTypeName());
+    public static PipelineChangedJobConfigurationProcessor getInstance(final JobType jobType) {
+        return TypedSPIRegistry.getRegisteredService(PipelineChangedJobConfigurationProcessor.class, jobType.getTypeName());
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
similarity index 84%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
index 9b963e76a82..4b8bde727c7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -23,10 +23,10 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.regex.Pattern;
 
 /**
- * Pipeline meta data changed handler.
+ * Pipeline meta data changed event handler.
  */
 @SingletonSPI
-public interface PipelineMetaDataChangedHandler {
+public interface PipelineMetaDataChangedEventHandler {
     
     /**
      * Get key pattern.
@@ -36,7 +36,7 @@ public interface PipelineMetaDataChangedHandler {
     Pattern getKeyPattern();
     
     /**
-     * Event changed handler.
+     * Handle meta data changed event.
      *
      * @param event changed event
      */
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactory.java
similarity index 73%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactory.java
index 0d746d5db8c..21d0d94f514 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactory.java
@@ -15,27 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 
 import java.util.Collection;
 
 /**
- * Pipeline meta data listener factory.
+ * Pipeline meta data changed event handler factory.
  */
-public final class PipelineMetaDataChangedHandlerFactory {
+public final class PipelineMetaDataChangedEventHandlerFactory {
     
     static {
-        ShardingSphereServiceLoader.register(PipelineMetaDataChangedHandler.class);
+        ShardingSphereServiceLoader.register(PipelineMetaDataChangedEventHandler.class);
     }
     
     /**
-     * Get pipeline meta data listener instance.
+     * Get instances.
      *
-     * @return pipeline meta data listener
+     * @return instances
      */
-    public static Collection<PipelineMetaDataChangedHandler> findAllInstances() {
-        return ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedHandler.class);
+    public static Collection<PipelineMetaDataChangedEventHandler> findAllInstances() {
+        return ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
similarity index 81%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index 676baa8efa4..95d32aa87b4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -26,10 +27,10 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.regex.Pattern;
 
 /**
- * Barrier pipeline meta data handler, .
+ * Barrier meta data changed event handler.
  */
 @Slf4j
-public final class BarrierMetaDataChangedHandler implements PipelineMetaDataChangedHandler {
+public final class BarrierMetaDataChangedEventHandler implements PipelineMetaDataChangedEventHandler {
     
     @Override
     public Pattern getKeyPattern() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
similarity index 63%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
index f6e849f3902..18da7257ad5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor;
-import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessorFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -29,10 +30,10 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.regex.Pattern;
 
 /**
- * Job config changed handler.
+ * Changed job configuration dispatcher.
  */
 @Slf4j
-public final class JobConfigurationChangedHandler implements PipelineMetaDataChangedHandler {
+public final class ChangedJobConfigurationDispatcher implements PipelineMetaDataChangedEventHandler {
     
     @Override
     public Pattern getKeyPattern() {
@@ -41,17 +42,17 @@ public final class JobConfigurationChangedHandler implements PipelineMetaDataCha
     
     @Override
     public void handle(final DataChangedEvent event) {
-        log.info("{} job config: {}", event.getType(), event.getKey());
         JobConfigurationPOJO jobConfigPOJO;
         try {
             jobConfigPOJO = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true);
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
-            log.error("analyze job config pojo failed.", ex);
+            log.error("unmarshal job configuration pojo failed.", ex);
             return;
         }
-        PipelineJobConfigurationChangedEventProcessor processor = PipelineJobConfigurationChangedEventProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
-        processor.process(event, jobConfigPOJO);
+        log.info("{} job configuration: {}, disabled={}", event.getType(), event.getKey(), jobConfigPOJO.isDisabled());
+        PipelineChangedJobConfigurationProcessor processor = PipelineChangedJobConfigurationProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
+        processor.process(event.getType(), jobConfigPOJO);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/QPSJobRateLimitAlgorithm.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/QPSJobRateLimitAlgorithm.java
index 5a095585b94..8aad098b1c4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/QPSJobRateLimitAlgorithm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.ratelimit;
+package org.apache.shardingsphere.data.pipeline.core.ratelimit;
 
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.RateLimiter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/TPSJobRateLimitAlgorithm.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/TPSJobRateLimitAlgorithm.java
index 8c3c4fc52b4..4a5cb6b426c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/TPSJobRateLimitAlgorithm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.ratelimit;
+package org.apache.shardingsphere.data.pipeline.core.ratelimit;
 
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.RateLimiter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 3914d0294ab..dca4aefb36a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -162,8 +162,8 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildTruncateSQL(final String schemaName, final String tableName) {
-        return String.format("TRUNCATE TABLE %s", decorate(schemaName, tableName));
+    public String buildDropSQL(final String schemaName, final String tableName) {
+        return String.format("DROP TABLE IF EXISTS %s", decorate(schemaName, tableName));
     }
     
     private String buildDeleteSQLInternal(final String schemaName, final String tableName, final Collection<Column> conditionColumns) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
index c911d5481fa..08618150c19 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
@@ -17,60 +17,24 @@
 
 package org.apache.shardingsphere.data.pipeline.core.util;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 /**
  * Pipeline schema table util.
  */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 @Slf4j
 public final class PipelineSchemaTableUtil {
     
-    private PipelineSchemaTableUtil() {
-    }
-    
-    /**
-     * Get schema tables map from actual data source.
-     *
-     * @param pipelineDataSourceConfig pipeline data source config
-     * @param schemaName schema name,
-     * @param tableName table name
-     * @return schema tables map
-     */
-    public static Map<String, List<String>> getSchemaTablesMapFromActual(final PipelineDataSourceConfiguration pipelineDataSourceConfig, final String schemaName, final String tableName) {
-        log.info("start get schema tables from actual, begin:{}", LocalDateTime.now());
-        Map<String, List<String>> result = new HashMap<>();
-        try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig)) {
-            try (Connection connection = dataSource.getConnection()) {
-                DatabaseMetaData metaData = connection.getMetaData();
-                String targetSchema = ObjectUtils.defaultIfNull(schemaName, connection.getSchema());
-                ResultSet resultSet = metaData.getTables(connection.getCatalog(), targetSchema, tableName, new String[]{"TABLE"});
-                while (resultSet.next()) {
-                    result.computeIfAbsent(targetSchema, k -> new ArrayList<>()).add(resultSet.getString("TABLE_NAME"));
-                }
-                log.info("get schema tables success, catalog:{}, schema:{}, table:{}, result:{}, end:{}", targetSchema, connection.getCatalog(), tableName, resultSet, LocalDateTime.now());
-            }
-        } catch (final SQLException ex) {
-            log.error("get schema name map error", ex);
-            throw new RuntimeException(ex.getMessage());
-        }
-        return result;
-    }
-    
     /**
      * Get default schema by connection.getSchema().
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
index eb8b74d6f87..26bc40b2c27 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.util;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -36,6 +38,7 @@ import java.util.List;
 /**
  * Pipeline table meta data util.
  */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineTableMetaDataUtil {
     
     /**
@@ -44,7 +47,7 @@ public final class PipelineTableMetaDataUtil {
      * @param schemaName schema name
      * @param tableName table name
      * @param dataSourceConfig source configuration
-     * @param loader pipeline table meta data loader* @return pipeline table meta data
+     * @param loader pipeline table meta data loader
      * @return pipeline table meta data
      */
     @SneakyThrows(SQLException.class)
@@ -74,7 +77,7 @@ public final class PipelineTableMetaDataUtil {
     }
     
     /**
-     * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+     * Get unique key column.
      *
      * @param schemaName schema name
      * @param tableName table name
@@ -89,7 +92,7 @@ public final class PipelineTableMetaDataUtil {
     }
     
     /**
-     * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+     * Get unique key column.
      *
      * @param schemaName schema name
      * @param tableName table name
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
similarity index 78%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 75e931ba70f..b54a5a493c8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.process;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
@@ -24,8 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -33,25 +32,26 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Migration job config event processor.
+ * Migration job configuration changed processor.
  */
 @Slf4j
-public final class MigrationJobConfigurationChangedEventProcessor implements PipelineJobConfigurationChangedEventProcessor {
+public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
     @Override
-    public void process(final DataChangedEvent event, final JobConfigurationPOJO jobConfigPOJO) {
+    public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
+            log.info("{} is disabled", jobId);
             PipelineJobCenter.stop(jobId);
             return;
         }
-        switch (event.getType()) {
+        switch (eventType) {
             case ADDED:
             case UPDATED:
-                if (PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
-                    log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
+                if (PipelineJobCenter.isJobExisting(jobId)) {
+                    log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
-                    log.info("{} executing jobs", jobConfigPOJO.getJobName());
+                    log.info("{} executing jobs", jobId);
                     CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());
                 }
                 break;
@@ -59,7 +59,7 @@ public final class MigrationJobConfigurationChangedEventProcessor implements Pip
                 log.info("deleted jobId={}", jobId);
                 MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
                 new MigrationJobPreparer().cleanup(jobConfig);
-                PipelineJobCenter.stop(jobConfigPOJO.getJobName());
+                PipelineJobCenter.stop(jobId);
                 break;
             default:
                 break;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index e68b75c7a1f..bc2dc0c57ae 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -362,7 +362,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
         try (
                 PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
-            String sql = pipelineSQLBuilder.buildTruncateSQL(targetSchemaName, targetTableName);
+            String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, targetTableName);
             log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, targetTableName, sql);
             try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
                 preparedStatement.execute();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
similarity index 88%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
index 55caeea6a01..89ef92e051e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.spi.process.MigrationJobConfigurationChangedEventProcessor
+org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationChangedJobConfigurationProcessor
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler
similarity index 77%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler
index 2d7ae078cf4..56b2c72dd8a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler
@@ -15,5 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.spi.handler.JobConfigurationChangedHandler
-org.apache.shardingsphere.data.pipeline.core.spi.handler.BarrierMetaDataChangedHandler
+org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.ChangedJobConfigurationDispatcher
+org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.BarrierMetaDataChangedEventHandler
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
index ed125966a01..a5df6f2d7a4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
@@ -15,5 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.QPSJobRateLimitAlgorithm
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.TPSJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.ratelimit.QPSJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.ratelimit.TPSJobRateLimitAlgorithm
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index b72cfd6d331..f58081c17f8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -57,7 +57,7 @@ public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildTruncateSQL(final String schemaName, final String tableName) {
+    public String buildDropSQL(final String schemaName, final String tableName) {
         return "";
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactoryTest.java
similarity index 63%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactoryTest.java
index d9a449bdb2b..0ace293e4c5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineMetaDataChangedEventHandlerFactoryTest.java
@@ -15,27 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.BarrierMetaDataChangedEventHandler;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl.ChangedJobConfigurationDispatcher;
 import org.junit.Test;
 
 import java.util.Collection;
 
 import static org.junit.Assert.assertTrue;
 
-public final class PipelineMetaDataChangedHandlerFactoryTest {
+public final class PipelineMetaDataChangedEventHandlerFactoryTest {
     
     @Test
     public void assertFindInstance() {
-        Collection<PipelineMetaDataChangedHandler> actual = PipelineMetaDataChangedHandlerFactory.findAllInstances();
+        Collection<PipelineMetaDataChangedEventHandler> actual = PipelineMetaDataChangedEventHandlerFactory.findAllInstances();
         boolean isContainMigration = false;
         boolean isContainBarrier = false;
-        for (PipelineMetaDataChangedHandler each : actual) {
-            if (each instanceof JobConfigurationChangedHandler) {
+        for (PipelineMetaDataChangedEventHandler each : actual) {
+            if (each instanceof ChangedJobConfigurationDispatcher) {
                 isContainMigration = true;
                 continue;
             }
-            if (each instanceof BarrierMetaDataChangedHandler) {
+            if (each instanceof BarrierMetaDataChangedEventHandler) {
                 isContainBarrier = true;
             }
         }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml
index efeb323b7d0..bb213a9e6d7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/logback.xml
@@ -32,6 +32,8 @@
     
     <logger name="io.netty" level="error" />
     
+    <logger name="org.apache.shardingsphere.elasticjob" level="debug" />
+    
     <root>
         <level value="info" />
         <appender-ref ref="console" />
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index 1060131fd4e..6a3e1edc364 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -17,14 +17,28 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
 import org.junit.Test;
 
+import java.util.Collection;
+import java.util.LinkedList;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
 public final class PipelineJobPublicAPIFactoryTest {
     
+    @Test
+    public void assertGetPipelineJobPublicAPI() {
+        Collection<Pair<JobType, Class<? extends PipelineJobPublicAPI>>> paramResult = new LinkedList<>();
+        paramResult.add(Pair.of(JobType.MIGRATION, MigrationJobAPIImpl.class));
+        for (Pair<JobType, Class<? extends PipelineJobPublicAPI>> each : paramResult) {
+            assertThat(PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(each.getKey().getTypeName()), instanceOf(each.getValue()));
+        }
+    }
+    
     @Test
     public void assertGetMigrationJobPublicAPI() {
         assertThat(PipelineJobPublicAPIFactory.getMigrationJobPublicAPI(), instanceOf(MigrationJobAPIImpl.class));