You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/09/06 06:33:42 UTC

[shardingsphere] branch master updated: Add CreateTableConfiguration; Refactor DataSourcePreparer and job preparer (#20824)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ae15890a0 Add CreateTableConfiguration; Refactor DataSourcePreparer and job preparer (#20824)
50ae15890a0 is described below

commit 50ae15890a00d89f3799a63279f6ceeb95d032b0
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Tue Sep 6 14:33:33 2022 +0800

    Add CreateTableConfiguration; Refactor DataSourcePreparer and job preparer (#20824)
    
    * Add PipelineTaskConfiguration
    
    * Rename TaskConfiguration to MigrationTaskConfiguration
    
    * Decouple InventoryTaskSplitter with MigrationTaskConfiguration
    
    * Add CreateTableConfiguration; Refactor DataSourcePreparer method parameters and job preparer
    
    * Unit test
---
 .../infra/database/type/DatabaseTypeEngine.java    | 10 +++
 .../api/config/CreateTableConfiguration.java}      | 38 +++++-----
 ...uration.java => PipelineTaskConfiguration.java} | 16 +---
 .../{TableName.java => IdentifierName.java}        | 17 ++---
 .../SchemaName.java}                               | 22 +++---
 .../SchemaTableName.java}                          | 16 ++--
 .../data/pipeline/api/metadata/TableName.java      | 40 +---------
 .../data/pipeline/core/api/PipelineJobAPI.java     |  4 +-
 .../DefaultPipelineDataSourceManager.java          |  1 +
 .../metadata/generator/PipelineDDLGenerator.java   | 44 +++++------
 .../core/prepare/InventoryTaskSplitter.java        | 20 ++---
 .../core/prepare/PipelineJobPreparerUtils.java     | 20 ++++-
 .../datasource/AbstractDataSourcePreparer.java     | 86 +++++++++-------------
 .../prepare/datasource/DataSourcePreparer.java     |  3 +-
 .../datasource/PrepareTargetSchemasParameter.java  | 13 +---
 .../datasource/PrepareTargetTablesParameter.java   | 33 +--------
 .../pipeline/scenario/migration/MigrationJob.java  |  3 +-
 .../scenario/migration/MigrationJobAPI.java        |  4 +
 .../scenario/migration/MigrationJobAPIImpl.java    | 42 +++++++----
 .../migration/MigrationJobItemContext.java         |  5 +-
 .../scenario/migration/MigrationJobPreparer.java   | 45 +++--------
 .../migration/MigrationTaskConfiguration.java}     | 11 ++-
 .../core/fixture/MigrationJobAPIFixture.java       |  4 +-
 .../datasource/MySQLDataSourcePreparer.java        | 13 ++--
 .../datasource/MySQLDataSourcePreparerTest.java    |  1 -
 .../datasource/OpenGaussDataSourcePreparer.java    | 17 ++---
 .../datasource/PostgreSQLDataSourcePreparer.java   | 17 ++---
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  8 +-
 .../core/prepare/InventoryTaskSplitterTest.java    |  9 ++-
 .../pipeline/core/task/IncrementalTaskTest.java    |  4 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |  4 +-
 .../pipeline/core/util/PipelineContextUtil.java    |  4 +-
 32 files changed, 252 insertions(+), 322 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index 08872c7124a..8cf56eb08d0 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -154,4 +154,14 @@ public final class DatabaseTypeEngine {
     public static String getDefaultSchemaName(final DatabaseType databaseType, final String databaseName) {
         return databaseType instanceof SchemaSupportedDatabaseType ? ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema() : databaseName.toLowerCase();
     }
+    
+    /**
+     * Get default schema name.
+     *
+     * @param databaseType database type
+     * @return default schema name
+     */
+    public static Optional<String> getDefaultSchemaName(final DatabaseType databaseType) {
+        return databaseType instanceof SchemaSupportedDatabaseType ? Optional.of(((SchemaSupportedDatabaseType) databaseType).getDefaultSchema()) : Optional.empty();
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
similarity index 55%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
index df94812d018..fd69f7be447 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
@@ -15,33 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+package org.apache.shardingsphere.data.pipeline.api.config;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
- * Prepare target schemas parameter.
+ * Create table configuration.
  */
 @RequiredArgsConstructor
 @Getter
-public final class PrepareTargetSchemasParameter {
+@ToString
+public final class CreateTableConfiguration {
     
-    private final List<String> logicTableNames;
+    private final Collection<CreateTableEntry> createTableEntries;
     
-    private final DatabaseType targetDatabaseType;
-    
-    private final String databaseName;
-    
-    private final PipelineDataSourceConfiguration dataSourceConfig;
-    
-    private final PipelineDataSourceManager dataSourceManager;
-    
-    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+    @RequiredArgsConstructor
+    @Getter
+    @ToString(exclude = {"sourceDataSourceConfig", "targetDataSourceConfig"})
+    public static final class CreateTableEntry {
+        
+        private final PipelineDataSourceConfiguration sourceDataSourceConfig;
+        
+        private final SchemaTableName sourceName;
+        
+        private final PipelineDataSourceConfiguration targetDataSourceConfig;
+        
+        private final SchemaTableName targetName;
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
similarity index 68%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
index 67c7187ce24..29af6e31597 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
@@ -17,20 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-
 /**
- * Task configuration.
+ * Pipeline task configuration.
  */
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
-    
-    private final DumperConfiguration dumperConfig;
-    
-    private final ImporterConfiguration importerConfig;
+public interface PipelineTaskConfiguration {
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
similarity index 81%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
index e84c4847807..f67c76ab003 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
@@ -18,27 +18,24 @@
 package org.apache.shardingsphere.data.pipeline.api.metadata;
 
 import lombok.Getter;
-import lombok.NonNull;
 
 import java.util.Objects;
 
 /**
- * Table name.
- * <p>It might be logic table name or actual table name.</p>
+ * Identifier name.
+ * <p>It might be schema name or table name, etc.</p>
  * <p>It's case-insensitive.</p>
  */
 @Getter
-public class TableName {
+public class IdentifierName {
     
-    @NonNull
     private final String original;
     
-    @NonNull
     private final String lowercase;
     
-    public TableName(final String tableName) {
-        this.original = tableName;
-        this.lowercase = tableName.toLowerCase();
+    public IdentifierName(final String identifierName) {
+        this.original = identifierName;
+        this.lowercase = null != identifierName ? identifierName.toLowerCase() : null;
     }
     
     // TODO table name case-sensitive for some database
@@ -50,7 +47,7 @@ public class TableName {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        final TableName tableName = (TableName) o;
+        final IdentifierName tableName = (IdentifierName) o;
         return lowercase.equals(tableName.lowercase);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
similarity index 64%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
index 67c7187ce24..a9195f2020a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
@@ -15,22 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import javax.annotation.Nullable;
 
 /**
- * Task configuration.
+ * Schema name.
+ * <p>It might be null.</p>
+ * <p>It's case-insensitive.</p>
  */
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
+public class SchemaName extends IdentifierName {
     
-    private final DumperConfiguration dumperConfig;
-    
-    private final ImporterConfiguration importerConfig;
+    public SchemaName(@Nullable final String schemaName) {
+        super(schemaName);
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
similarity index 75%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
index 67c7187ce24..f3f7dc7fbd4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
@@ -15,22 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
 
 import lombok.Getter;
+import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 
 /**
- * Task configuration.
+ * Schema name and table name.
  */
-@Getter
 @RequiredArgsConstructor
+@Getter
 @ToString
-public final class TaskConfiguration {
+public class SchemaTableName {
     
-    private final DumperConfiguration dumperConfig;
+    @NonNull
+    private final SchemaName schemaName;
     
-    private final ImporterConfiguration importerConfig;
+    @NonNull
+    private final TableName tableName;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
index e84c4847807..b48332797dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
@@ -17,50 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.api.metadata;
 
-import lombok.Getter;
 import lombok.NonNull;
 
-import java.util.Objects;
-
 /**
  * Table name.
  * <p>It might be logic table name or actual table name.</p>
  * <p>It's case-insensitive.</p>
  */
-@Getter
-public class TableName {
-    
-    @NonNull
-    private final String original;
-    
-    @NonNull
-    private final String lowercase;
-    
-    public TableName(final String tableName) {
-        this.original = tableName;
-        this.lowercase = tableName.toLowerCase();
-    }
-    
-    // TODO table name case-sensitive for some database
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        final TableName tableName = (TableName) o;
-        return lowercase.equals(tableName.lowercase);
-    }
-    
-    @Override
-    public int hashCode() {
-        return Objects.hash(lowercase);
-    }
+public class TableName extends IdentifierName {
     
-    @Override
-    public String toString() {
-        return original;
+    public TableName(@NonNull final String tableName) {
+        super(tableName);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index e083d70efcc..06b4c336435 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.api;
 
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
@@ -58,7 +58,7 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI
      * @param pipelineProcessConfig pipeline process configuration
      * @return task configuration
      */
-    TaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
+    PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
     
     /**
      * Build pipeline process context.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
index 66ad7498d3c..ce2a9cbbf33 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
@@ -56,6 +56,7 @@ public final class DefaultPipelineDataSourceManager implements PipelineDataSourc
         }
     }
     
+    // TODO monitor each DataSource close
     /**
      * Close, close cached data source.
      */
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index ecff9462e74..1df7d29a532 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -64,24 +64,24 @@ public final class PipelineDDLGenerator {
      * @param databaseType database type
      * @param sourceDataSource source data source
      * @param schemaName schema name
-     * @param logicTableName table name
-     * @param actualTableName actual table name
+     * @param sourceTableName source table name
+     * @param targetTableName target table name
      * @param parserEngine parser engine
-     * @return DDL
+     * @return DDL SQL
      * @throws SQLException SQL exception 
      */
     public String generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
-                                   final String schemaName, final String logicTableName, final String actualTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
-        log.info("generateLogicDDLSQL, databaseType={}, schemaName={}, tableName={}", databaseType.getType(), schemaName, logicTableName);
+                                   final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
+        log.info("generateLogicDDLSQL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
         StringBuilder result = new StringBuilder();
-        for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, actualTableName)) {
-            Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, logicTableName, parserEngine, each);
+        for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
+            Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
             queryContext.ifPresent(ddlSQL -> result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
         }
         return result.toString();
     }
     
-    private Optional<String> decorate(final DatabaseType databaseType, final DataSource dataSource, final String schemaName, final String logicTableName,
+    private Optional<String> decorate(final DatabaseType databaseType, final DataSource dataSource, final String schemaName, final String targetTableName,
                                       final ShardingSphereSQLParserEngine parserEngine, final String sql) throws SQLException {
         if (sql.trim().isEmpty()) {
             return Optional.empty();
@@ -90,7 +90,7 @@ public final class PipelineDDLGenerator {
         try (Connection connection = dataSource.getConnection()) {
             databaseName = connection.getCatalog();
         }
-        String result = decorateActualSQL(databaseName, logicTableName, parserEngine, sql.trim());
+        String result = decorateActualSQL(databaseName, targetTableName, parserEngine, sql.trim());
         // TODO remove it after set search_path is supported.
         if ("openGauss".equals(databaseType.getType())) {
             return decorateOpenGauss(databaseName, schemaName, result, parserEngine);
@@ -98,24 +98,24 @@ public final class PipelineDDLGenerator {
         return Optional.of(result);
     }
     
-    private String decorateActualSQL(final String databaseName, final String logicTableName, final ShardingSphereSQLParserEngine parserEngine, final String sql) {
+    private String decorateActualSQL(final String databaseName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine, final String sql) {
         QueryContext queryContext = getQueryContext(databaseName, parserEngine, sql);
         SQLStatementContext<?> sqlStatementContext = queryContext.getSqlStatementContext();
         Map<SQLSegment, String> replaceMap = new TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
         if (sqlStatementContext instanceof CreateTableStatementContext) {
-            appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
-            appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
+            appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext);
+            appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
         }
         if (sqlStatementContext instanceof CommentStatementContext) {
-            appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
+            appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
         }
         if (sqlStatementContext instanceof CreateIndexStatementContext) {
-            appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
-            appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
+            appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
+            appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext);
         }
         if (sqlStatementContext instanceof AlterTableStatementContext) {
-            appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
-            appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
+            appendFromIndexAndConstraint(replaceMap, targetTableName, sqlStatementContext);
+            appendFromTable(replaceMap, targetTableName, (TableAvailable) sqlStatementContext);
         }
         return doDecorateActualTable(replaceMap, sql);
     }
@@ -125,12 +125,12 @@ public final class PipelineDDLGenerator {
         return new QueryContext(sqlStatementContext, sql, Collections.emptyList());
     }
     
-    private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceMap, final String logicTableName, final SQLStatementContext<?> sqlStatementContext) {
+    private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceMap, final String targetTableName, final SQLStatementContext<?> sqlStatementContext) {
         if (!(sqlStatementContext instanceof TableAvailable) || ((TableAvailable) sqlStatementContext).getTablesContext().getTables().isEmpty()) {
             return;
         }
         TableNameSegment tableNameSegment = ((TableAvailable) sqlStatementContext).getTablesContext().getTables().iterator().next().getTableName();
-        if (!tableNameSegment.getIdentifier().getValue().equals(logicTableName)) {
+        if (!tableNameSegment.getIdentifier().getValue().equals(targetTableName)) {
             if (sqlStatementContext instanceof IndexAvailable) {
                 for (IndexSegment each : ((IndexAvailable) sqlStatementContext).getIndexes()) {
                     String logicIndexName = IndexMetaDataUtil.getLogicIndexName(each.getIndexName().getIdentifier().getValue(), tableNameSegment.getIdentifier().getValue());
@@ -146,10 +146,10 @@ public final class PipelineDDLGenerator {
         }
     }
     
-    private void appendFromTable(final Map<SQLSegment, String> replaceMap, final String logicTableName, final TableAvailable sqlStatementContext) {
+    private void appendFromTable(final Map<SQLSegment, String> replaceMap, final String targetTableName, final TableAvailable sqlStatementContext) {
         for (SimpleTableSegment each : sqlStatementContext.getAllTables()) {
-            if (!logicTableName.equals(each.getTableName().getIdentifier().getValue())) {
-                replaceMap.put(each.getTableName(), logicTableName);
+            if (!targetTableName.equals(each.getTableName().getIdentifier().getValue())) {
+                replaceMap.put(each.getTableName(), targetTableName);
             }
         }
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index f2e3a02064b..e10415451c0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
@@ -66,17 +66,19 @@ import java.util.List;
 @RequiredArgsConstructor
 public final class InventoryTaskSplitter {
     
-    private final PipelineTableMetaDataLoader metaDataLoader;
+    private final PipelineDataSourceWrapper sourceDataSource;
     
-    private final PipelineDataSourceManager dataSourceManager;
+    private final DumperConfiguration dumperConfig;
     
-    private final ExecuteEngine importerExecuteEngine;
+    private final ImporterConfiguration importerConfig;
     
-    private final PipelineDataSourceWrapper sourceDataSource;
+    private final InventoryIncrementalJobItemProgress initProgress;
     
-    private final TaskConfiguration taskConfig;
+    private final PipelineTableMetaDataLoader metaDataLoader;
     
-    private final InventoryIncrementalJobItemProgress initProgress;
+    private final PipelineDataSourceManager dataSourceManager;
+    
+    private final ExecuteEngine importerExecuteEngine;
     
     /**
      * Split inventory data to multi-tasks.
@@ -88,8 +90,8 @@ public final class InventoryTaskSplitter {
         List<InventoryTask> result = new LinkedList<>();
         PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
         DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobItemContext.getJobId(), jobItemContext.getShardingItem());
-        for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, taskConfig.getDumperConfig())) {
-            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader, importerExecuteEngine,
+        for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
+            result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader, importerExecuteEngine,
                     jobProgressListener));
         }
         return result;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 096b8851510..0a080946ad9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.core.check.datasource.DataSourceCheckerFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
@@ -38,7 +39,11 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionIniti
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
@@ -72,8 +77,9 @@ public final class PipelineJobPreparerUtils {
      *
      * @param databaseType database type
      * @param prepareTargetSchemasParameter prepare target schemas parameter
+     * @throws SQLException if prepare target schema fail
      */
-    public static void prepareTargetSchema(final String databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParameter) {
+    public static void prepareTargetSchema(final String databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParameter) throws SQLException {
         Optional<DataSourcePreparer> dataSourcePreparer = DataSourcePreparerFactory.getInstance(databaseType);
         if (!dataSourcePreparer.isPresent()) {
             log.info("dataSourcePreparer null, ignore prepare target");
@@ -82,6 +88,18 @@ public final class PipelineJobPreparerUtils {
         dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
     }
     
+    /**
+     * Get SQL parser engine.
+     *
+     * @param targetDatabaseName target database name
+     * @return SQL parser engine
+     */
+    public static ShardingSphereSQLParserEngine getSQLParserEngine(final String targetDatabaseName) {
+        ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
+        ShardingSphereDatabase database = metaData.getDatabases().get(targetDatabaseName);
+        return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType().getType());
+    }
+    
     /**
      * Prepare target tables.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index c0172f9c51f..9dcf18e2a8d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,29 +17,24 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
-import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -56,43 +51,38 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
     
     @Override
-    public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) {
-        Set<String> schemaNames = getSchemaNames(parameter);
-        String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(), parameter.getDatabaseName());
-        log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}", schemaNames, defaultSchema);
-        PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
-        try (Connection targetConnection = getCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
-            for (String each : schemaNames) {
-                if (each.equalsIgnoreCase(defaultSchema)) {
-                    continue;
-                }
-                String sql = pipelineSQLBuilder.buildCreateSchemaSQL(each);
+    public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter) throws SQLException {
+        DatabaseType targetDatabaseType = parameter.getTargetDatabaseType();
+        if (!targetDatabaseType.isSchemaAvailable()) {
+            log.info("prepareTargetSchemas, target database does not support schema, ignore, targetDatabaseType={}", targetDatabaseType);
+            return;
+        }
+        CreateTableConfiguration createTableConfig = parameter.getCreateTableConfig();
+        String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
+        PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
+        Set<String> createdSchemaNames = new HashSet<>();
+        for (CreateTableEntry each : createTableConfig.getCreateTableEntries()) {
+            String targetSchemaName = each.getTargetName().getSchemaName().getOriginal();
+            if (null == targetSchemaName) {
+                continue;
+            }
+            if (targetSchemaName.equalsIgnoreCase(defaultSchema)) {
+                continue;
+            }
+            if (createdSchemaNames.contains(targetSchemaName)) {
+                continue;
+            }
+            try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
+                String sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
                 log.info("prepareTargetSchemas, sql={}", sql);
                 try (Statement statement = targetConnection.createStatement()) {
                     statement.execute(sql);
+                    createdSchemaNames.add(targetSchemaName);
                 } catch (final SQLException ignored) {
                 }
             }
-        } catch (final SQLException ex) {
-            throw new PipelineJobPrepareFailedException("Can not get connection.", ex);
         }
-    }
-    
-    private Set<String> getSchemaNames(final PrepareTargetSchemasParameter parameter) {
-        Set<String> result = new HashSet<>();
-        for (String each : parameter.getLogicTableNames()) {
-            String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each);
-            if (null == schemaName) {
-                throw new PipelineJobPrepareFailedException("Can not get schemaName by logic table name " + each);
-            }
-            result.add(schemaName);
-        }
-        return result;
-    }
-    
-    // TODO the invocation is disabled for now, it might be used again for next new feature
-    protected final PipelineDataSourceWrapper getSourceCachedDataSource(final MigrationJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
-        return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
+        log.info("prepareTargetSchemas, createdSchemaNames={}, defaultSchema={}", createdSchemaNames, defaultSchema);
     }
     
     protected final PipelineDataSourceWrapper getCachedDataSource(final PipelineDataSourceConfiguration dataSourceConfig, final PipelineDataSourceManager dataSourceManager) {
@@ -121,18 +111,14 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         return PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
     }
     
-    protected final List<String> listCreateLogicalTableSQL(final PrepareTargetTablesParameter parameter) throws SQLException {
+    protected final String getCreateTargetTableSQL(final CreateTableEntry createTableEntry, final PipelineDataSourceManager dataSourceManager,
+                                                   final ShardingSphereSQLParserEngine sqlParserEngine) throws SQLException {
+        DatabaseType databaseType = createTableEntry.getSourceDataSourceConfig().getDatabaseType();
+        DataSource sourceDataSource = dataSourceManager.getDataSource(createTableEntry.getSourceDataSourceConfig());
+        String schemaName = createTableEntry.getSourceName().getSchemaName().getOriginal();
+        String sourceTableName = createTableEntry.getSourceName().getTableName().getOriginal();
+        String targetTableName = createTableEntry.getTargetName().getTableName().getOriginal();
         PipelineDDLGenerator generator = new PipelineDDLGenerator();
-        List<String> result = new LinkedList<>();
-        for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
-            String dataSourceName = each.getDataNodes().get(0).getDataSourceName();
-            DataSource dataSource = parameter.getSourceDataSourceMap().get(dataSourceName);
-            DatabaseType databaseType = DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
-            String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            String actualTableName = each.getDataNodes().get(0).getTableName();
-            Preconditions.checkNotNull(actualTableName, "Could not get actualTableName, nodeEntry={}", each);
-            result.add(generator.generateLogicDDL(databaseType, dataSource, schemaName, each.getLogicTableName(), actualTableName, parameter.getSqlParserEngine()));
-        }
-        return result;
+        return generator.generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
index e4ef48ab384..501765108aa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
@@ -32,8 +32,9 @@ public interface DataSourcePreparer extends TypedSPI {
      * Prepare target schemas.
      *
      * @param parameter prepare target schemas parameter
+     * @throws SQLException if prepare target schema fail
      */
-    void prepareTargetSchemas(PrepareTargetSchemasParameter parameter);
+    void prepareTargetSchemas(PrepareTargetSchemasParameter parameter) throws SQLException;
     
     /**
      * Prepare target tables.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index df94812d018..6f70d1c8b49 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -19,13 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
-import java.util.List;
-
 /**
  * Prepare target schemas parameter.
  */
@@ -33,15 +30,9 @@ import java.util.List;
 @Getter
 public final class PrepareTargetSchemasParameter {
     
-    private final List<String> logicTableNames;
-    
     private final DatabaseType targetDatabaseType;
     
-    private final String databaseName;
-    
-    private final PipelineDataSourceConfiguration dataSourceConfig;
+    private final CreateTableConfiguration createTableConfig;
     
     private final PipelineDataSourceManager dataSourceManager;
-    
-    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index c9f1e8fc764..65fec5e55ef 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -18,46 +18,21 @@
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
 import lombok.Getter;
-import lombok.NonNull;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 
-import javax.sql.DataSource;
-import java.util.Map;
-
 /**
  * Prepare target tables parameter.
  */
+@RequiredArgsConstructor
 @Getter
 public final class PrepareTargetTablesParameter {
     
-    private final String databaseName;
-    
-    private final JobDataNodeLine tablesFirstDataNodes;
-    
-    private final PipelineDataSourceConfiguration targetDataSourceConfig;
-    
-    private final Map<String, DataSource> sourceDataSourceMap;
+    private final CreateTableConfiguration createTableConfig;
     
     private final PipelineDataSourceManager dataSourceManager;
     
-    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
-    
     private final ShardingSphereSQLParserEngine sqlParserEngine;
-    
-    public PrepareTargetTablesParameter(@NonNull final String databaseName, @NonNull final PipelineDataSourceConfiguration targetDataSourceConfig,
-                                        @NonNull final Map<String, DataSource> sourceDataSourceMap, @NonNull final PipelineDataSourceManager dataSourceManager,
-                                        @NonNull final JobDataNodeLine tablesFirstDataNodes, final TableNameSchemaNameMapping tableNameSchemaNameMapping,
-                                        @NonNull final ShardingSphereSQLParserEngine sqlParserEngine) {
-        this.databaseName = databaseName;
-        this.targetDataSourceConfig = targetDataSourceConfig;
-        this.sourceDataSourceMap = sourceDataSourceMap;
-        this.tablesFirstDataNodes = tablesFirstDataNodes;
-        this.dataSourceManager = dataSourceManager;
-        this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
-        this.sqlParserEngine = sqlParserEngine;
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 2dd261b8420..e1737a49eef 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -68,7 +67,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
         MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
         InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
         MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
-        TaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
+        MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
         MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
         if (getTasksRunnerMap().containsKey(shardingItem)) {
             log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 1a3d902efe2..383cf7e9581 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfig
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
@@ -37,6 +38,9 @@ public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI,
     @Override
     MigrationJobConfiguration getJobConfiguration(String jobId);
     
+    @Override
+    MigrationTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
+    
     @Override
     MigrationProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index bd898dc5d9a..4be428d86ed 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -23,9 +23,10 @@ import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
@@ -48,6 +49,9 @@ import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
@@ -191,22 +195,32 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     }
     
     @Override
-    public TaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+    public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
         Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
         tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()), new LogicTableName(jobConfig.getTargetTableName()));
         Map<LogicTableName, String> tableNameSchemaMap = TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), Collections.singletonList(jobConfig.getTargetTableName()));
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(tableNameSchemaMap);
-        DumperConfiguration dumperConfig = createDumperConfiguration(jobConfig.getJobId(), jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, tableNameSchemaNameMapping);
+        CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig);
+        DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig.getJobId(), jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, tableNameSchemaNameMapping);
         // TODO now shardingColumnsMap always empty,
-        ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, pipelineProcessConfig, Collections.emptyMap(), tableNameSchemaNameMapping);
-        TaskConfiguration result = new TaskConfiguration(dumperConfig, importerConfig);
-        log.info("createTaskConfiguration, sourceResourceName={}, result={}", jobConfig.getSourceResourceName(), result);
+        ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, Collections.emptyMap(), tableNameSchemaNameMapping);
+        MigrationTaskConfiguration result = new MigrationTaskConfiguration(createTableConfig, dumperConfig, importerConfig);
+        log.info("buildTaskConfiguration, sourceResourceName={}, result={}", jobConfig.getSourceResourceName(), result);
         return result;
     }
     
-    private static DumperConfiguration createDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource,
-                                                                 final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+    private CreateTableConfiguration buildCreateTableConfiguration(final MigrationJobConfiguration jobConfig) {
+        String sourceSchemaName = jobConfig.getSourceSchemaName();
+        String targetSchemaName = DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName : null;
+        CreateTableEntry createTableEntry = new CreateTableEntry(
+                jobConfig.getSource(), new SchemaTableName(new SchemaName(sourceSchemaName), new TableName(jobConfig.getSourceTableName())),
+                jobConfig.getTarget(), new SchemaTableName(new SchemaName(targetSchemaName), new TableName(jobConfig.getTargetTableName())));
+        return new CreateTableConfiguration(Collections.singletonList(createTableEntry));
+    }
+    
+    private DumperConfiguration buildDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource,
+                                                         final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         DumperConfiguration result = new DumperConfiguration();
         result.setJobId(jobId);
         result.setDataSourceName(dataSourceName);
@@ -216,18 +230,17 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
         return result;
     }
     
-    private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
-                                                                     final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
-        PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
+    private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
+                                                             final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
         PipelineProcessContext migrationProcessContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
-        return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, migrationProcessContext.getWriteRateLimitAlgorithm(),
+        return new ImporterConfiguration(jobConfig.getTarget(), unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, migrationProcessContext.getWriteRateLimitAlgorithm(),
                 retryTimes, concurrency);
     }
     
-    private static Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    private Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
         Map<LogicTableName, Set<String>> result = new HashMap<>(shardingColumnsMap.size());
         for (Entry<LogicTableName, Set<String>> entry : shardingColumnsMap.entrySet()) {
             result.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
@@ -343,10 +356,9 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
         String targetTableName = jobConfig.getTargetTableName();
         // TODO use jobConfig.targetSchemaName
         String targetSchemaName = jobConfig.getSourceSchemaName();
-        PipelineDataSourceConfiguration target = jobConfig.getTarget();
         PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
         try (
-                PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
+                PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
             String sql = pipelineSQLBuilder.buildTruncateSQL(targetSchemaName, targetTableName);
             log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, targetTableName, sql);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 9a72f14c16c..2904932b9e0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -23,7 +23,6 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -56,7 +55,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     
     private final InventoryIncrementalJobItemProgress initProgress;
     
-    private final TaskConfiguration taskConfig;
+    private final MigrationTaskConfiguration taskConfig;
     
     private final Collection<InventoryTask> inventoryTasks = new LinkedList<>();
     
@@ -85,7 +84,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     };
     
     public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final int shardingItem, final InventoryIncrementalJobItemProgress initProgress,
-                                   final MigrationProcessContext jobProcessContext, final TaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
+                                   final MigrationProcessContext jobProcessContext, final MigrationTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index c02cd51ea0f..63a94777d93 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -18,11 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -43,17 +40,11 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
-import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Migration job preparer.
@@ -141,41 +132,27 @@ public final class MigrationJobPreparer {
     
     private void prepareTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
         String targetDatabaseType = jobConfig.getTargetDatabaseType();
-        if (isTargetSchemaAvailable(jobConfig) && StringUtils.isNotBlank(jobConfig.getSourceSchemaName())) {
-            PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(Collections.singletonList(jobConfig.getTargetTableName()),
-                    DatabaseTypeFactory.getInstance(targetDatabaseType), jobConfig.getTargetDatabaseName(),
-                    jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), jobItemContext.getDataSourceManager(), tableNameSchemaNameMapping);
-            PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParameter);
-        }
-        ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
-        ShardingSphereDatabase sphereDatabase = metaData.getDatabases().get(jobConfig.getTargetDatabaseName());
-        ShardingSphereSQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
-        JobDataNodeLine jobDataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
-        PipelineDataSourceWrapper dataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
-        Map<String, DataSource> sourceDataSourceMap = new HashMap<>(1, 1.0F);
-        sourceDataSourceMap.put(jobConfig.getSourceResourceName(), dataSource);
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getTargetDatabaseName(),
-                jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), sourceDataSourceMap, jobItemContext.getDataSourceManager(),
-                jobDataNodeLine, tableNameSchemaNameMapping, sqlParserEngine);
+        CreateTableConfiguration createTableConfig = jobItemContext.getTaskConfig().getCreateTableConfig();
+        PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(
+                DatabaseTypeFactory.getInstance(targetDatabaseType), createTableConfig, jobItemContext.getDataSourceManager());
+        PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParameter);
+        ShardingSphereSQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(jobConfig.getTargetDatabaseName());
+        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(createTableConfig, jobItemContext.getDataSourceManager(), sqlParserEngine);
         PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, prepareTargetTablesParameter);
     }
     
-    private boolean isTargetSchemaAvailable(final MigrationJobConfiguration jobConfig) {
-        return DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()).isSchemaAvailable();
-    }
-    
     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
-        InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(),
-                jobItemContext.getJobProcessContext().getImporterExecuteEngine(), jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), jobItemContext.getInitProgress());
+        InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(
+                jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+                jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
         jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
     }
     
     private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) throws SQLException {
         PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
         ExecuteEngine incrementalDumperExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
-        TaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+        MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
         JobItemIncrementalTasksProgress initIncremental = jobItemContext.getInitProgress() == null ? null : jobItemContext.getInitProgress().getIncremental();
         taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), dataSourceManager));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
similarity index 69%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
index 67c7187ce24..52672769aab 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
@@ -15,20 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 
 /**
- * Task configuration.
+ * Migration task configuration.
  */
 @Getter
 @RequiredArgsConstructor
 @ToString
-public final class TaskConfiguration {
+public final class MigrationTaskConfiguration implements PipelineTaskConfiguration {
+    
+    private final CreateTableConfiguration createTableConfig;
     
     private final DumperConfiguration dumperConfig;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index d68a2eb44f0..9608049f057 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
@@ -31,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 
@@ -153,7 +153,7 @@ public final class MigrationJobAPIFixture implements MigrationJobAPI {
     }
     
     @Override
-    public TaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+    public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
         return null;
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index ceccccf1742..9b7f0d2cde0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 
@@ -33,14 +33,13 @@ import java.sql.SQLException;
 public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     
     @Override
-    public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
+    public void prepareTargetTables(final PrepareTargetTablesParameter parameter) throws SQLException {
         PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
-        try (Connection targetConnection = getCachedDataSource(parameter.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
-            for (String each : listCreateLogicalTableSQL(parameter)) {
-                executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
+        for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
+            String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
+            try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+                executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL));
             }
-        } catch (final SQLException ex) {
-            throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
         }
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 0112176b321..7884a10dbb7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -83,7 +83,6 @@ public final class MySQLDataSourcePreparerTest {
         when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
         when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getTarget().getParameter()).thenReturn("target");
-        when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
     }
     
     @Test
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index d58133902b0..78b6fef6559 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
 import com.google.common.base.Splitter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.List;
 import java.util.stream.Collectors;
 
 /**
@@ -37,15 +37,14 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
     
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) throws SQLException {
-        List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = getCachedDataSource(parameter.getTargetDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
-            for (String createLogicalTableSQL : createLogicalTableSQLs) {
-                for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
-                    executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
+        PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
+        for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
+            String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
+            try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+                for (String sql : Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
+                    executeTargetTableSQL(targetConnection, sql);
                 }
             }
-        } catch (final SQLException ex) {
-            throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
         }
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index 8f75e968183..aa30e1c380c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
 import com.google.common.base.Splitter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.List;
 import java.util.stream.Collectors;
 
 /**
@@ -37,15 +37,14 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
     
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) throws SQLException {
-        List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = getCachedDataSource(parameter.getTargetDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
-            for (String createLogicalTableSQL : createLogicalTableSQLs) {
-                for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
-                    executeTargetTableSQL(targetConnection, each);
+        PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
+        for (CreateTableEntry each : parameter.getCreateTableConfig().getCreateTableEntries()) {
+            String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, parameter.getSqlParserEngine());
+            try (Connection targetConnection = getCachedDataSource(each.getTargetDataSourceConfig(), dataSourceManager).getConnection()) {
+                for (String sql : Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
+                    executeTargetTableSQL(targetConnection, sql);
                 }
             }
-        } catch (final SQLException ex) {
-            throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
         }
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f64fde18e31..9141e76717b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.api.impl;
 
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -34,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.BeforeClass;
@@ -127,13 +127,13 @@ public final class GovernanceRepositoryAPIImplTest {
     
     private MigrationJobItemContext mockJobItemContext() {
         MigrationJobItemContext result = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
-        TaskConfiguration taskConfig = result.getTaskConfig();
+        MigrationTaskConfiguration taskConfig = result.getTaskConfig();
         result.getInventoryTasks().add(mockInventoryTask(taskConfig));
         result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
         return result;
     }
     
-    private InventoryTask mockInventoryTask(final TaskConfiguration taskConfig) {
+    private InventoryTask mockInventoryTask(final MigrationTaskConfiguration taskConfig) {
         InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
         dumperConfig.setPosition(new PlaceholderPosition());
         dumperConfig.setActualTableName("t_order");
@@ -147,7 +147,7 @@ public final class GovernanceRepositoryAPIImplTest {
                 new DefaultPipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
     }
     
-    private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
+    private IncrementalTask mockIncrementalTask(final MigrationTaskConfiguration taskConfig) {
         DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
         dumperConfig.setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 27f35e6442b..55337499309 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare;
 
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -27,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -45,7 +45,7 @@ public final class InventoryTaskSplitterTest {
     
     private MigrationJobItemContext jobItemContext;
     
-    private TaskConfiguration taskConfig;
+    private MigrationTaskConfiguration taskConfig;
     
     private PipelineDataSourceManager dataSourceManager;
     
@@ -59,8 +59,9 @@ public final class InventoryTaskSplitterTest {
     @Before
     public void setUp() {
         initJobItemContext();
-        inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(),
-                jobItemContext.getJobProcessContext().getImporterExecuteEngine(), jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), jobItemContext.getInitProgress());
+        inventoryTaskSplitter = new InventoryTaskSplitter(
+                jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+                jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
     }
     
     private void initJobItemContext() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index c0097e72f04..076766b8a6a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.task;
 
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -26,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPr
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public final class IncrementalTaskTest {
     
     @Before
     public void setUp() {
-        TaskConfiguration taskConfig = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
+        MigrationTaskConfiguration taskConfig = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 03096ef1cad..7f9f60caec0 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.task;
 
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -30,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestExcep
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public final class InventoryTaskTest {
     
     private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new DefaultPipelineDataSourceManager();
     
-    private TaskConfiguration taskConfig;
+    private MigrationTaskConfiguration taskConfig;
     
     @BeforeClass
     public static void beforeClass() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 56409e6fbd1..d98bc93950c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -32,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.Memory
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -159,7 +159,7 @@ public final class PipelineContextUtil {
         PipelineProcessConfiguration processConfig = mockPipelineProcessConfiguration();
         MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), processConfig);
         int jobShardingItem = 0;
-        TaskConfiguration taskConfig = new MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
+        MigrationTaskConfiguration taskConfig = new MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
         return new MigrationJobItemContext(jobConfig, jobShardingItem, null, processContext, taskConfig, new DefaultPipelineDataSourceManager());
     }