You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/04 14:50:28 UTC

[shardingsphere] branch master updated: Extract PipelineTableMetaDataLoader interface; Move dumper creator SPI to api module (#20739)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 769ff9079b7 Extract PipelineTableMetaDataLoader interface; Move dumper creator SPI to api module (#20739)
769ff9079b7 is described below

commit 769ff9079b79e67030f68d52222de6305cfaab7c
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun Sep 4 22:50:20 2022 +0800

    Extract PipelineTableMetaDataLoader interface; Move dumper creator SPI to api module (#20739)
    
    * Extract PipelineTableMetaDataLoader as interface and move to api module
    
    * Move dumper creator SPI to api module
    
    * Move out dumper interface from spi package
    
    * Improve code style
---
 .../DataConsistencyCalculateParameter.java         |  2 +-
 .../{spi => api}/ingest/dumper/Dumper.java         |  2 +-
 .../ingest/dumper/IncrementalDumper.java           |  2 +-
 .../ingest/dumper/InventoryDumper.java             |  2 +-
 .../loader/PipelineTableMetaDataLoader.java}       | 27 +++++------
 .../{ => model}/PipelineColumnMetaData.java        |  2 +-
 .../api}/metadata/model/PipelineIndexMetaData.java |  3 +-
 .../api}/metadata/model/PipelineTableMetaData.java |  3 +-
 .../ingest/dumper/IncrementalDumperCreator.java    | 12 ++---
 .../dumper/IncrementalDumperCreatorFactory.java    |  4 +-
 .../spi}/ingest/dumper/InventoryDumperCreator.java | 23 +++++-----
 .../dumper/InventoryDumperCreatorFactory.java      | 14 +++---
 .../ingest/dumper/AbstractIncrementalDumper.java   |  4 +-
 .../ingest/dumper/AbstractInventoryDumper.java     |  6 +--
 .../core/ingest/dumper/DefaultInventoryDumper.java |  2 +-
 .../dumper/DefaultInventoryDumperCreator.java      | 10 ++++-
 ...va => StandardPipelineTableMetaDataLoader.java} | 22 ++++-----
 .../core/prepare/InventoryTaskSplitter.java        |  8 ++--
 .../data/pipeline/core/task/IncrementalTask.java   |  6 +--
 .../data/pipeline/core/task/InventoryTask.java     |  6 +--
 .../migration/MigrationDataConsistencyChecker.java |  8 ++--
 .../migration/MigrationJobItemContext.java         |  5 ++-
 .../scenario/migration/MigrationJobPreparer.java   |  2 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java |  2 +-
 .../mysql/MySQLIncrementalDumperCreator.java       |  6 +--
 .../mysql/MySqlInventoryDumperCreator.java         |  9 ++--
 .../mysql/ingest/MySQLIncrementalDumper.java       |  6 +--
 .../mysql/ingest/MySQLInventoryDumper.java         |  2 +-
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 ...eline.spi.ingest.dumper.InventoryDumperCreator} |  0
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  9 ++--
 .../mysql/ingest/MySQLInventoryDumperTest.java     |  4 +-
 .../OpenGaussIncrementalDumperCreator.java         |  6 +--
 .../opengauss/ingest/OpenGaussWalDumper.java       |  2 +-
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 .../PostgreSQLIncrementalDumperCreator.java        |  6 +--
 .../PostgreSQLInventoryDumperCreator.java          |  6 +--
 .../ingest/PostgreSQLInventoryDumper.java          |  2 +-
 .../postgresql/ingest/PostgreSQLWalDumper.java     |  2 +-
 .../postgresql/ingest/wal/WalEventConverter.java   |  4 +-
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 ...eline.spi.ingest.dumper.InventoryDumperCreator} |  0
 .../ingest/PostgreSQLJdbcDumperTest.java           |  4 +-
 .../postgresql/ingest/PostgreSQLWalDumperTest.java |  5 ++-
 .../ingest/wal/WalEventConverterTest.java          |  4 +-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  7 +--
 .../IncrementalDumperCreatorFactoryTest.java       | 52 +++++++++-------------
 .../dumper/InventoryDumperCreatorFactoryTest.java  | 43 +++++++-----------
 .../core/fixture/FixtureIncrementalDumper.java     |  2 +-
 .../fixture/FixtureIncrementalDumperCreator.java   | 13 +++---
 .../core/fixture/FixtureInventoryDumper.java       |  2 +-
 .../fixture/FixtureInventoryDumperCreator.java     | 13 +++---
 ...> StandardPipelineTableMetaDataLoaderTest.java} | 13 +++---
 .../metadata/model/PipelineTableMetaDataTest.java  |  3 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |  5 ++-
 .../data/pipeline/core/task/InventoryTaskTest.java |  7 +--
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 ...eline.spi.ingest.dumper.InventoryDumperCreator} |  0
 58 files changed, 200 insertions(+), 214 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 6d6cb7ecb18..7463fadae10 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -24,7 +24,7 @@ import lombok.Setter;
 import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 
 import java.util.Collection;
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
index bd4e101fb61..79724699ab6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
 
 import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
index 6871815663a..db292c1b1bf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
 
 /**
  * Incremental dumper.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
index cd79d9b89d9..9405e29878b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
 
 /**
  * Inventory dumper.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
similarity index 60%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
index 759ce69a0fe..5fff7bb307c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
@@ -15,24 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.loader;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 
 /**
- * Pipeline meta data of index.
+ * Pipeline table metadata loader.
  */
-@RequiredArgsConstructor
-@Getter
-@ToString
-public final class PipelineIndexMetaData {
-    
-    private final String name;
+public interface PipelineTableMetaDataLoader {
     
-    private final List<PipelineColumnMetaData> columns;
+    /**
+     * Get table metadata, load if it does not exist.
+     *
+     * @param schemaName schema name. nullable
+     * @param tableName dedicated table name, not table name pattern
+     * @return table metadata
+     */
+    PipelineTableMetaData getTableMetaData(String schemaName, String tableName);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
index d4476eeb53f..ccc001cf6c1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.metadata;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
 
 import lombok.Getter;
 import lombok.NonNull;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
similarity index 88%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
index 759ce69a0fe..2a1713efeac 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
 
 import java.util.List;
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
index 7a1ed2d9789..c35eb8f463f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
 
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
 
 import java.util.ArrayList;
 import java.util.Collection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
similarity index 82%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
index 5583e056e1e..6eb1b956145 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
@@ -34,11 +34,11 @@ public interface IncrementalDumperCreator<P> extends TypedSPI {
     /**
      * Create incremental dumper.
      *
-     * @param dumperConfig dumperConfig
+     * @param dumperConfig dumper configuration
      * @param position position
      * @param channel channel
-     * @param metaDataLoader metaDataLoader
-     * @return IncrementalDumper
+     * @param metaDataLoader meta data loader
+     * @return incremental dumper
      */
     IncrementalDumper createIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition<P> position,
                                               PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
similarity index 93%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
index 4db34d487ef..f81e9fbcd61 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -36,7 +36,7 @@ public class IncrementalDumperCreatorFactory {
      * Incremental dumper creator.
      *
      * @param databaseType database type
-     * @return Incremental dumper creator
+     * @return incremental dumper creator
      */
     public static IncrementalDumperCreator getInstance(final String databaseType) {
         return TypedSPIRegistry.getRegisteredService(IncrementalDumperCreator.class, databaseType);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
similarity index 70%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
index 0bc3df83de2..496e9bb7a66 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
@@ -15,29 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
-import javax.sql.DataSource;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
+import javax.sql.DataSource;
+
 /**
  * Inventory dumper creator.
  */
 @SingletonSPI
-public interface InventoryDumperCreator extends TypedSPI {
+public interface InventoryDumperCreator extends TypedSPI, RequiredSPI {
     
     /**
-     * Create Inventory Dumper.
-     * @param inventoryDumperConfig inventoryDumperConfig
+     * Create inventory dumper.
+     *
+     * @param inventoryDumperConfig inventory dumper configuration
      * @param channel chanel
-     * @param sourceDataSource sourceDataSource
-     * @param sourceMetaDataLoader sourceMetaDataLoader
-     * @return InventoryDumper
+     * @param sourceDataSource source data source
+     * @param sourceMetaDataLoader source meta data loader
+     * @return inventory dumper
      */
     InventoryDumper createInventoryDumper(InventoryDumperConfiguration inventoryDumperConfig, PipelineChannel channel,
                                           DataSource sourceDataSource, PipelineTableMetaDataLoader sourceMetaDataLoader);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
similarity index 74%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
index 6c218b56391..6c7b2120630 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
-import java.util.Optional;
-
 /**
  * Inventory dumper creator factory.
  */
@@ -35,13 +34,12 @@ public class InventoryDumperCreatorFactory {
     }
     
     /**
-     * Get inventoryDumper creator instance.
+     * Get inventory dumper creator instance.
      *
-     * @param databaseType databaseType
-     * @return InventoryDumperCreator
+     * @param databaseType database type
+     * @return inventory dumper creator
      */
     public static InventoryDumperCreator getInstance(final String databaseType) {
-        Optional<InventoryDumperCreator> result = TypedSPIRegistry.findRegisteredService(InventoryDumperCreator.class, databaseType);
-        return result.orElseGet(DefaultInventoryDumperCreator::new);
+        return TypedSPIRegistry.findRegisteredService(InventoryDumperCreator.class, databaseType).orElseGet(() -> RequiredSPIRegistry.getRegisteredService(InventoryDumperCreator.class));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
index f36e16e6f2d..bd396dac9cb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 
 /**
  * Abstract incremental dumper.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 34f0ebee193..78de64e85dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -38,14 +39,13 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
index 22e9c343dc8..596f740a609 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
index 335b989d454..2860dcb14f0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
@@ -19,8 +19,9 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
 
 import javax.sql.DataSource;
 
@@ -34,4 +35,9 @@ public final class DefaultInventoryDumperCreator implements InventoryDumperCreat
                                                  final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
         return new DefaultInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
     }
+    
+    @Override
+    public boolean isDefault() {
+        return true;
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
similarity index 91%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 2b5f943091f..dcc6ed43e6a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -21,9 +21,10 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 
 import java.sql.Connection;
@@ -41,25 +42,18 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Pipeline table metadata loader.
+ * Standard pipeline table metadata loader.
  */
 @RequiredArgsConstructor
 @Slf4j
-// TODO PipelineTableMetaDataLoader SPI
-public final class PipelineTableMetaDataLoader {
+public final class StandardPipelineTableMetaDataLoader implements PipelineTableMetaDataLoader {
     
-    // TODO it doesn't support ShardingSphereDataSource
+    // It doesn't support ShardingSphereDataSource
     private final PipelineDataSourceWrapper dataSource;
     
     private final Map<TableName, PipelineTableMetaData> tableMetaDataMap = new ConcurrentHashMap<>();
     
-    /**
-     * Get table metadata, load if it does not exist.
-     *
-     * @param schemaName schema name. nullable
-     * @param tableName dedicated table name, not table name pattern
-     * @return table metadata
-     */
+    @Override
     public PipelineTableMetaData getTableMetaData(final String schemaName, final String tableName) {
         PipelineTableMetaData result = tableMetaDataMap.get(new TableName(tableName));
         if (null != result) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index c5d10c6f5b4..f2e3a02064b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -35,14 +35,14 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimary
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index df9150dff41..d6aad311a44 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -26,19 +26,19 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
 
 import java.util.Collection;
 import java.util.LinkedList;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 23dbf5ffc77..ba1f187c65d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -26,19 +26,19 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreatorFactory;
 
 import javax.sql.DataSource;
 import java.util.List;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 201422f5ed0..3c14a669de0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -28,11 +28,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -174,7 +174,7 @@ public final class MigrationDataConsistencyChecker {
             String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
             for (String each : Collections.singletonList(sourceTableName)) {
-                PipelineTableMetaData tableMetaData = new PipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
+                PipelineTableMetaData tableMetaData = new StandardPipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
                 if (null == tableMetaData) {
                     throw new PipelineDataConsistencyCheckFailedException("Can not get metadata for table " + each);
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c5a4470921b..9a72f14c16c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -29,8 +29,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
@@ -79,7 +80,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
         
         @Override
         protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
-            return new PipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
+            return new StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
         }
     };
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index c3cd181a04d..c02cd51ea0f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -28,12 +28,12 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
 import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index 04c6b47d9a9..d4240d77e4a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
index 6fe23ba7a34..076c700b23b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.mysql;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * MySql incremental dumper creator.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
index 7697ee5d13c..225fbeb9e9e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
@@ -17,13 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql;
 
-import javax.sql.DataSource;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
+
+import javax.sql.DataSource;
 
 public class MySqlInventoryDumperCreator implements InventoryDumperCreator {
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 40727cb4ac3..b6f0e056f20 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -31,11 +31,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 2fde33ba951..6fd46b68b3a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumpe [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 3ee5618cd76..002201f3ec3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -27,12 +27,13 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderReco
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -82,7 +83,7 @@ public final class MySQLIncrementalDumperTest {
         initTableData(dumperConfig);
         dumperConfig.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
         channel = new MultiplexMemoryPipelineChannel();
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
         incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
         PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true);
         when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(column);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
index 0d8bc4f31fc..d8aaf019c89 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -49,7 +49,7 @@ public final class MySQLInventoryDumperTest {
         PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
         InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
         PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+        mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new StandardPipelineTableMetaDataLoader(dataSource));
         initTableData(dataSource);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index fce078e1591..4083a4cd690 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.opengauss;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * OpenGauss incremental dumper creator.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 8362bed849d..84123622fb8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Incremen [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
index 684f191b380..4b067556abc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.postgresql;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * PostgreSQL incremental dumper creator.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
index aedabf2fd75..6394e2e9cb2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
@@ -19,10 +19,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
 
 import javax.sql.DataSource;
 import java.util.Collection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
index 7f56c647281..3cbf9a158b5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 67f181511ab..c304037e96d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index cc9e26117d4..4ffeabcc458 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Increm [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Inventor [...]
similarity index 100%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index d4388d1473f..2ed82a1616e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -50,7 +50,7 @@ public final class PostgreSQLJdbcDumperTest {
         InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
         dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100),
-                dataSource, new PipelineTableMetaDataLoader(dataSource));
+                dataSource, new StandardPipelineTableMetaDataLoader(dataSource));
         initTableData(dataSource);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 7e686c000ec..2ecb055f62d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -24,10 +24,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
@@ -84,7 +85,7 @@ public final class PostgreSQLWalDumperTest {
         position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         channel = new MultiplexMemoryPipelineChannel();
         dumperConfig = mockDumperConfiguration();
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
         walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index 16681b8b044..dd93c267854 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
@@ -59,7 +59,7 @@ public final class WalEventConverterTest {
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
-        walEventConverter = new WalEventConverter(dumperConfig, new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
+        walEventConverter = new WalEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
         initTableData(dumperConfig);
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f340101601c..f64fde18e31 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,12 +22,13 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -141,7 +142,7 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setUniqueKeyDataType(Types.INTEGER);
         dumperConfig.setShardingItem(0);
         PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
         return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
                 new DefaultPipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
     }
@@ -149,7 +150,7 @@ public final class GovernanceRepositoryAPIImplTest {
     private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
         DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
         dumperConfig.setPosition(new PlaceholderPosition());
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
                 new DefaultPipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
index 6352be60159..b9b1477b6b8 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
@@ -17,80 +17,68 @@
 
 package org.apache.shardingsphere.data.pipeline.core.dumper;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import java.util.Collections;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.junit.Before;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
 import org.junit.Test;
 import org.mockito.Mock;
 
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
 public final class IncrementalDumperCreatorFactoryTest {
     
-    private PipelineDataSourceWrapper dataSource;
-    
     @Mock
-    private WalPosition walPosition;
-    
-    @Before
-    public void setUp() {
-        PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
-        DumperConfiguration dumperConfig = mockDumperConfiguration();
-        dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-    }
+    private IngestPosition<?> ingestPosition;
     
     @Test
     public void assertIncrementalDumperCreatorForMysql() {
-        IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("MySQL")
-                .createIncrementalDumper(mockDumperConfiguration(), new BinlogPosition("binlog-000001", 4L), new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("MySQL");
         assertThat(actual, instanceOf(MySQLIncrementalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForPostgreSQL() {
-        IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("PostgreSQL")
-                .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("PostgreSQL");
         assertThat(actual, instanceOf(PostgreSQLWalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForOpenGauss() {
-        IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("openGauss")
-                .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("openGauss");
         assertThat(actual, instanceOf(OpenGaussWalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForFixture() {
-        IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("Fixture")
-                .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("Fixture");
         assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForH2() {
-        IncrementalDumper actual = IncrementalDumperCreatorFactory.getInstance("H2")
-                .createIncrementalDumper(mockDumperConfiguration(), walPosition, new SimpleMemoryPipelineChannel(100), new PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("H2");
         assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
     }
     
+    private IncrementalDumper createIncrementalDumper(final String databaseType) {
+        return IncrementalDumperCreatorFactory.getInstance(databaseType)
+                .createIncrementalDumper(mockDumperConfiguration(), ingestPosition, new SimpleMemoryPipelineChannel(100), new StandardPipelineTableMetaDataLoader(null));
+    }
+    
     private DumperConfiguration mockDumperConfiguration() {
         DumperConfiguration result = new DumperConfiguration();
         result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
index 3a9c90b14d8..d8bcbf36379 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
@@ -17,63 +17,54 @@
 
 package org.apache.shardingsphere.data.pipeline.core.dumper;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
-import org.junit.Before;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreatorFactory;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
 public final class InventoryDumperCreatorFactoryTest {
     
-    private PipelineDataSourceWrapper dataSource;
-    
-    @Before
-    public void setUp() {
-        PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
-        InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
-        dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-    }
-    
     @Test
-    public void assertInventoryDumperCreatorForMysql() {
-        InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("MySql")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+    public void assertInventoryDumperCreatorForMySQL() {
+        InventoryDumper actual = createInventoryDumper("MySQL");
         assertThat(actual, instanceOf(MySQLInventoryDumper.class));
     }
     
     @Test
     public void assertInventoryDumperCreatorForPostgreSQL() {
-        InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("PostgreSQL")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+        InventoryDumper actual = createInventoryDumper("PostgreSQL");
         assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
     }
     
     @Test
     public void assertInventoryDumperCreatorForOpenGauss() {
-        InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("openGauss")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+        InventoryDumper actual = createInventoryDumper("openGauss");
         assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
     }
     
     @Test
     public void assertInventoryDumperCreatorForFixture() {
-        InventoryDumper actual = InventoryDumperCreatorFactory.getInstance("Fixture")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new PipelineTableMetaDataLoader(dataSource));
+        InventoryDumper actual = createInventoryDumper("Fixture");
         assertThat(actual, instanceOf(FixtureInventoryDumper.class));
     }
     
+    private InventoryDumper createInventoryDumper(final String databaseType) {
+        PipelineDataSourceWrapper dataSource = null;
+        return InventoryDumperCreatorFactory.getInstance(databaseType)
+                .createInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100), dataSource, new StandardPipelineTableMetaDataLoader(dataSource));
+    }
+    
     private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index b97eebb219a..c613f92e5b7 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -21,8 +21,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index 7ca4d4d49eb..b0152f36f90 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -17,16 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Fixture incremental dumper creator.
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
index 3da47584451..dfa4d701596 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
index d099e0ab3a9..d41cde2b172 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
@@ -17,15 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
+
+import javax.sql.DataSource;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import javax.sql.DataSource;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 
 /**
  * Fixture iInventory dumper creator.
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
similarity index 91%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
index 3c59c49838e..a6223c54031 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
@@ -18,9 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
 
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.apache.shardingsphere.test.mock.MockedDataSource;
 import org.junit.Before;
@@ -43,7 +44,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 // TODO use H2 to do real test
-public final class PipelineTableMetaDataLoaderTest {
+public final class StandardPipelineTableMetaDataLoaderTest {
     
     private static final String TEST_CATALOG = "catalog";
     
@@ -115,7 +116,7 @@ public final class PipelineTableMetaDataLoaderTest {
     
     @Test
     public void assertGetTableMetaData() {
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
         PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(null, TEST_TABLE);
         assertColumnMetaData(tableMetaData);
         assertPrimaryKeys(tableMetaData.getPrimaryKeyColumns());
@@ -151,6 +152,6 @@ public final class PipelineTableMetaDataLoaderTest {
     @Test(expected = RuntimeException.class)
     public void assertGetTableMetaDataFailure() throws SQLException {
         when(dataSource.getConnection()).thenThrow(new SQLException(""));
-        new PipelineTableMetaDataLoader(dataSource).getTableMetaData(null, TEST_TABLE);
+        new StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(null, TEST_TABLE);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index 53638b69e49..59b1449474f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.model;
 
-import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index e5818fb9667..c0097e72f04 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -20,9 +20,10 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.junit.After;
@@ -48,7 +49,7 @@ public final class IncrementalTaskTest {
     public void setUp() {
         TaskConfiguration taskConfig = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelCreator(),
                 new DefaultPipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index d1a6d3b28e2..03096ef1cad 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -23,10 +23,11 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.junit.AfterClass;
@@ -67,7 +68,7 @@ public final class InventoryTaskTest {
     public void assertStartWithGetEstimatedRowsFailure() {
         InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
         PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
@@ -82,7 +83,7 @@ public final class InventoryTaskTest {
         // TODO use t_order_0, and also others
         InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_order", "t_order");
         PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
similarity index 100%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
similarity index 100%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator