You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/13 12:22:36 UTC

[shardingsphere] branch master updated: Support unique key table check migration (#20944)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bc83e6e0d97 Support unique key table check migration (#20944)
bc83e6e0d97 is described below

commit bc83e6e0d97c13abdb449c3b992be7fd8d6abaf6
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Sep 13 20:22:28 2022 +0800

    Support unique key table check migration (#20944)
    
    * Support unique key table check migration
    
    * persist unique key in job configuration
    
    * Fix ci error
    
    * Fix codestyle
    
    * Fix codestyle
---
 .../api/config/job/MigrationJobConfiguration.java  |   3 +
 .../job/yaml/YamlMigrationJobConfiguration.java    |   3 +
 .../yaml/YamlMigrationJobConfigurationSwapper.java |   6 +-
 .../metadata/yaml/YamlPipelineColumnMetaData.java  |  40 +++++++
 .../yaml/YamlPipelineColumnMetaDataSwapper.java    |  51 ++++++++
 .../core/prepare/InventoryTaskSplitter.java        |  66 ++---------
 .../core/util/PipelineTableMetaDataUtil.java       | 129 +++++++++++++++++++++
 .../migration/MigrationDataConsistencyChecker.java |   6 +-
 .../scenario/migration/MigrationJobAPIImpl.java    |   6 +
 .../scenario/migration/MigrationJobPreparer.java   |   8 +-
 .../cases/migration/AbstractMigrationITCase.java   |   4 +-
 .../migration/general/MySQLMigrationGeneralIT.java |   4 +-
 .../general/PostgreSQLMigrationGeneralIT.java      |   4 +-
 .../primarykey/TextPrimaryKeyMigrationIT.java      |  13 ++-
 .../env/scenario/primary_key/unique_key/mysql.xml  |  28 +++++
 .../core/api/impl/MigrationJobAPIImplTest.java     |  27 ++++-
 .../core/prepare/InventoryTaskSplitterTest.java    |  29 ++++-
 17 files changed, 350 insertions(+), 77 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
index 57be24501b7..cb19ff443e6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 
 import java.util.List;
 
@@ -67,6 +68,8 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration
     
     private final List<String> jobShardingDataNodes;
     
+    private final PipelineColumnMetaData uniqueKeyColumn;
+    
     private final int concurrency;
     
     private final int retryTimes;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
index 917b9f9367e..34d13bc8d97 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
@@ -23,6 +23,7 @@ import lombok.Setter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
 
 import java.util.List;
 
@@ -66,6 +67,8 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
     
     private List<String> jobShardingDataNodes;
     
+    private YamlPipelineColumnMetaData uniqueKeyColumn;
+    
     private int concurrency = 3;
     
     private int retryTimes = 3;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
index 93a1ed0edf8..1fd2e5bf86b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
 
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -32,6 +33,8 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
     
     private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
     
+    private final YamlPipelineColumnMetaDataSwapper pipelineColumnMetaDataSwapper = new YamlPipelineColumnMetaDataSwapper();
+    
     @Override
     public YamlMigrationJobConfiguration swapToYamlConfiguration(final MigrationJobConfiguration data) {
         YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
@@ -47,6 +50,7 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
         result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
         result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
         result.setJobShardingDataNodes(data.getJobShardingDataNodes());
+        result.setUniqueKeyColumn(pipelineColumnMetaDataSwapper.swapToYamlConfiguration(data.getUniqueKeyColumn()));
         result.setConcurrency(data.getConcurrency());
         result.setRetryTimes(data.getRetryTimes());
         return result;
@@ -59,7 +63,7 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
                 yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
                 yamlConfig.getSourceTableName(), yamlConfig.getTargetTableName(),
                 dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
-                yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(),
+                yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(), pipelineColumnMetaDataSwapper.swapToObject(yamlConfig.getUniqueKeyColumn()),
                 yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java
new file mode 100644
index 00000000000..260e7cf4786
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.metadata.yaml;
+
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+/**
+ * Yaml pipeline column meta data.
+ */
+@Data
+public final class YamlPipelineColumnMetaData implements YamlConfiguration {
+    
+    private int ordinalPosition;
+    
+    private String name;
+    
+    private int dataType;
+    
+    private String dataTypeName;
+    
+    private boolean nullable;
+    
+    private boolean primaryKey;
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java
new file mode 100644
index 00000000000..8d6064c0684
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.metadata.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * Yaml pipeline column meta data swapper.
+ */
+public final class YamlPipelineColumnMetaDataSwapper implements YamlConfigurationSwapper<YamlPipelineColumnMetaData, PipelineColumnMetaData> {
+    
+    @Override
+    public YamlPipelineColumnMetaData swapToYamlConfiguration(final PipelineColumnMetaData data) {
+        if (null == data) {
+            return null;
+        }
+        YamlPipelineColumnMetaData result = new YamlPipelineColumnMetaData();
+        result.setName(data.getName());
+        result.setDataType(data.getDataType());
+        result.setDataTypeName(data.getDataTypeName());
+        result.setNullable(data.isNullable());
+        result.setPrimaryKey(data.isPrimaryKey());
+        result.setOrdinalPosition(data.getOrdinalPosition());
+        return result;
+    }
+    
+    @Override
+    public PipelineColumnMetaData swapToObject(final YamlPipelineColumnMetaData yamlConfig) {
+        if (null == yamlConfig) {
+            return null;
+        }
+        return new PipelineColumnMetaData(yamlConfig.getOrdinalPosition(), yamlConfig.getName(), yamlConfig.getDataType(), yamlConfig.getDataTypeName(), yamlConfig.isNullable(),
+                yamlConfig.isPrimaryKey());
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 04856d1f356..7cc8a52f8ec 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -28,15 +27,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
@@ -68,7 +63,7 @@ public final class InventoryTaskSplitter {
     
     private final PipelineDataSourceWrapper sourceDataSource;
     
-    private final DumperConfiguration dumperConfig;
+    private final InventoryDumperConfiguration dumperConfig;
     
     private final ImporterConfiguration importerConfig;
     
@@ -97,15 +92,15 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitDumperConfig(final InventoryIncrementalJobItemContext jobItemContext, final DumperConfiguration dumperConfig) {
+    private Collection<InventoryDumperConfiguration> splitDumperConfig(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
-            result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource, metaDataLoader));
+            result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource));
         }
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitByTable(final DumperConfiguration dumperConfig) {
+    private Collection<InventoryDumperConfiguration> splitByTable(final InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         dumperConfig.getTableNameMap().forEach((key, value) -> {
             InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
@@ -113,19 +108,21 @@ public final class InventoryTaskSplitter {
             inventoryDumperConfig.setActualTableName(key.getOriginal());
             inventoryDumperConfig.setLogicTableName(value.getOriginal());
             inventoryDumperConfig.setPosition(new PlaceholderPosition());
+            inventoryDumperConfig.setUniqueKey(dumperConfig.getUniqueKey());
+            inventoryDumperConfig.setUniqueKeyDataType(dumperConfig.getUniqueKeyDataType());
             result.add(inventoryDumperConfig);
         });
         return result;
     }
     
     private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
-                                                                       final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+                                                                       final DataSource dataSource) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         InventoryIncrementalProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
         PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
         int batchSize = readConfig.getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
-        Collection<IngestPosition<?>> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource, metaDataLoader);
+        Collection<IngestPosition<?>> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource);
         int i = 0;
         for (IngestPosition<?> inventoryPosition : inventoryPositions) {
             InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
@@ -143,58 +140,19 @@ public final class InventoryTaskSplitter {
     }
     
     private Collection<IngestPosition<?>> getInventoryPositions(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig,
-                                                                final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
-        String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
-        String actualTableName = dumperConfig.getActualTableName();
-        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, actualTableName);
-        PipelineColumnMetaData uniqueKeyColumn = mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName);
+                                                                final DataSource dataSource) {
         if (null != initProgress && initProgress.getStatus() != JobStatus.PREPARING_FAILURE) {
-            Collection<IngestPosition<?>> result = initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
-            for (IngestPosition<?> each : result) {
-                if (each instanceof PrimaryKeyPosition) {
-                    dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
-                    dumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
-                    break;
-                }
-            }
             // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
-            return result;
+            return initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
         }
-        dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
-        int uniqueKeyDataType = uniqueKeyColumn.getDataType();
-        dumperConfig.setUniqueKeyDataType(uniqueKeyDataType);
+        int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
         if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
             return getPositionByIntegerPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
         } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
             return getPositionByStringPrimaryKeyRange();
         } else {
-            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is not integer or string type", actualTableName));
-        }
-    }
-    
-    private PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, final String tableName) {
-        if (null == tableMetaData) {
-            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: can not get table metadata ", tableName));
-        }
-        List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
-        if (primaryKeys.size() > 1) {
-            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is union primary", tableName));
-        }
-        if (1 == primaryKeys.size()) {
-            return tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
-        }
-        Collection<PipelineIndexMetaData> uniqueIndexes = tableMetaData.getUniqueIndexes();
-        if (uniqueIndexes.isEmpty()) {
-            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key or unique index", tableName));
-        }
-        if (1 == uniqueIndexes.size() && 1 == uniqueIndexes.iterator().next().getColumns().size()) {
-            PipelineColumnMetaData column = uniqueIndexes.iterator().next().getColumns().get(0);
-            if (!column.isNullable()) {
-                return column;
-            }
+            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is not integer or string type", dumperConfig.getActualTableName()));
         }
-        throw new PipelineJobCreationException(
-                String.format("Can not split range for table %s, reason: table contains multiple unique index or unique index contains nullable/multiple column(s)", tableName));
     }
     
     private Collection<IngestPosition<?>> getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
new file mode 100644
index 00000000000..f6d04b192e5
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Pipeline table meta data util.
+ */
+public final class PipelineTableMetaDataUtil {
+    
+    /**
+     * Get pipeline table meta data.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSourceConfig source configuration
+     * @param loader pipeline table meta data loader* @return pipeline table meta data
+     * @return pipeline table meta data
+     */
+    @SneakyThrows(SQLException.class)
+    public static PipelineTableMetaData getPipelineTableMetaData(final String schemaName, final String tableName, final StandardPipelineDataSourceConfiguration dataSourceConfig,
+                                                                 final PipelineTableMetaDataLoader loader) {
+        try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
+            return getPipelineTableMetaData(schemaName, tableName, dataSource, loader);
+        }
+    }
+    
+    /**
+     * Get pipeline table meta data.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSource data source
+     * @param loader pipeline table meta data loader
+     * @return pipeline table meta data.
+     */
+    public static PipelineTableMetaData getPipelineTableMetaData(final String schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
+                                                                 final PipelineTableMetaDataLoader loader) {
+        if (null == loader) {
+            return new StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(schemaName, tableName);
+        } else {
+            return loader.getTableMetaData(schemaName, tableName);
+        }
+    }
+    
+    /**
+     * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSourceConfig data source config
+     * @param loader pipeline table meta data loader
+     * @return pipeline column meta data.
+     */
+    public static PipelineColumnMetaData getUniqueKeyColumn(final String schemaName, final String tableName, final StandardPipelineDataSourceConfiguration dataSourceConfig,
+                                                            final StandardPipelineTableMetaDataLoader loader) {
+        PipelineTableMetaData pipelineTableMetaData = getPipelineTableMetaData(schemaName, tableName, dataSourceConfig, loader);
+        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, tableName);
+    }
+    
+    /**
+     * Get unique key column, if primary key exists, return primary key, otherwise return the first unique key.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSource data source
+     * @param loader pipeline table meta data loader
+     * @return pipeline column meta data.
+     */
+    public static PipelineColumnMetaData getUniqueKeyColumn(final String schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
+                                                            final StandardPipelineTableMetaDataLoader loader) {
+        PipelineTableMetaData pipelineTableMetaData = getPipelineTableMetaData(schemaName, tableName, dataSource, loader);
+        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, tableName);
+    }
+    
+    private static PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, final String tableName) {
+        if (null == tableMetaData) {
+            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: can not get table metadata ", tableName));
+        }
+        List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
+        if (primaryKeys.size() > 1) {
+            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is union primary", tableName));
+        }
+        if (1 == primaryKeys.size()) {
+            return tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+        }
+        Collection<PipelineIndexMetaData> uniqueIndexes = tableMetaData.getUniqueIndexes();
+        if (uniqueIndexes.isEmpty()) {
+            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key or unique index", tableName));
+        }
+        if (1 == uniqueIndexes.size() && 1 == uniqueIndexes.iterator().next().getColumns().size()) {
+            PipelineColumnMetaData column = uniqueIndexes.iterator().next().getColumns().get(0);
+            if (!column.isNullable()) {
+                return column;
+            }
+        }
+        throw new PipelineJobCreationException(
+                String.format("Can not split range for table %s, reason: table contains multiple unique index or unique index contains nullable/multiple column(s)", tableName));
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index bbd61d295e6..c6061c1eaf4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -32,8 +32,8 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineDataConsistencyCheckFailedException;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -174,12 +174,12 @@ public final class MigrationDataConsistencyChecker {
             String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
             for (String each : Collections.singletonList(sourceTableName)) {
-                PipelineTableMetaData tableMetaData = new StandardPipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
+                PipelineTableMetaData tableMetaData = PipelineTableMetaDataUtil.getPipelineTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each, sourceDataSource, null);
                 if (null == tableMetaData) {
                     throw new PipelineDataConsistencyCheckFailedException("Can not get metadata for table " + each);
                 }
                 Collection<String> columnNames = tableMetaData.getColumnNames();
-                PipelineColumnMetaData uniqueKey = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+                PipelineColumnMetaData uniqueKey = jobConfig.getUniqueKeyColumn();
                 DataConsistencyCalculateParameter sourceParameter = buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
                 DataConsistencyCalculateParameter targetParameter = buildParameter(targetDataSource, tableNameSchemaNameMapping, targetTableName, columnNames, targetDatabaseType, sourceDatabaseType,
                         uniqueKey);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 2fa0d392489..e68b75c7a1f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -51,6 +51,8 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
@@ -67,6 +69,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigr
 import org.apache.shardingsphere.data.pipeline.core.exception.connection.DropMigrationSourceResourceException;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -465,6 +468,9 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
         result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
         result.setTargetDatabaseName(targetDatabaseName);
         result.setTargetTableName(parameter.getTargetTableName());
+        YamlPipelineColumnMetaData uniqueKeyColumn = new YamlPipelineColumnMetaDataSwapper().swapToYamlConfiguration(PipelineTableMetaDataUtil.getUniqueKeyColumn(sourceSchemaName,
+                parameter.getSourceTableName(), sourceDataSourceConfig, null));
+        result.setUniqueKeyColumn(uniqueKeyColumn);
         extendYamlJobConfiguration(result);
         MigrationJobConfiguration jobConfiguration = new YamlMigrationJobConfigurationSwapper().swapToObject(result);
         start(jobConfiguration);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index e17a19ff197..99693e5e20a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -26,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
@@ -143,8 +145,12 @@ public final class MigrationJobPreparer {
     }
     
     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
+        InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+        PipelineColumnMetaData uniqueKeyColumn = jobItemContext.getJobConfig().getUniqueKeyColumn();
+        inventoryDumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+        inventoryDumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
         InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(
-                jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+                jobItemContext.getSourceDataSource(), inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
                 jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
         jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
     }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 05d525474fe..fcdd0f9fd72 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -159,7 +159,7 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
         return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
-    protected void assertCheckMigrationSuccess(final String jobId) {
+    protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) {
         for (int i = 0; i < 5; i++) {
             if (checkJobIncrementTaskFinished(jobId)) {
                 break;
@@ -168,7 +168,7 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
         }
         boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
         log.info("second check job result: {}", secondCheckJobResult);
-        List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='DATA_MATCH')", jobId));
+        List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType));
         log.info("check job results: {}", checkJobResults);
         for (Map<String, Object> entry : checkJobResults) {
             assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 3cfcd013495..7c3889a7827 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -42,7 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 /**
- * General scaling test case, includes multiple cases.
+ * General migration test case, includes multiple cases.
  */
 @Slf4j
 @RunWith(Parameterized.class)
@@ -103,7 +103,7 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
     
     private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
         waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId);
+        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 81b5bed2ba9..e181a40838c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -111,7 +111,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
         sourceExecuteWithLog(String.format("INSERT INTO %s.t_order_copy (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, KEY_GENERATE_ALGORITHM.generateKey(),
                 1, "afterStop"));
         startMigrationByJobId(jobId);
-        assertCheckMigrationSuccess(jobId);
+        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
     
@@ -119,7 +119,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
         startMigrationOrderItem(true);
         String jobId = getJobIdByTableName("t_order_item");
         waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId);
+        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index d9d681514d2..d180deae6a8 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.integration.data.pipeline.cases.migration.Abstr
 import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.integration.env.container.atomic.util.DatabaseTypeUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -38,7 +39,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 @RunWith(Parameterized.class)
@@ -58,6 +59,7 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
         }
         for (String version : ENV.listDatabaseDockerImageNames(new MySQLDatabaseType())) {
             result.add(new ScalingParameterized(new MySQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/mysql.xml"));
+            result.add(new ScalingParameterized(new MySQLDatabaseType(), version, "env/scenario/primary_key/unique_key/mysql.xml"));
         }
         for (String version : ENV.listDatabaseDockerImageNames(new PostgreSQLDatabaseType())) {
             result.add(new ScalingParameterized(new PostgreSQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
@@ -79,8 +81,14 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
         startMigrationOrder();
         String jobId = listJobId().get(0);
         waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        sourceExecuteWithLog(String.format("INSERT INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1, "afterStop"));
+        // TODO The ordering of primary or unique keys for text types is different, may cause check failed, need fix
+        if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
+            assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
+        } else {
+            assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+        }
         stopMigrationByJobId(jobId);
-        assertCheckMigrationSuccess(jobId);
         if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             commitMigrationByJobId(jobId);
             List<String> lastJobIds = listJobId();
@@ -100,5 +108,6 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
             }
             preparedStatement.executeBatch();
         }
+        log.info("init data succeed");
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
new file mode 100644
index 00000000000..5f8a288d79a
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
@@ -0,0 +1,28 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<command>
+    <create-table-order>
+        CREATE TABLE `t_order` (
+        `order_id` varchar(255) NOT NULL,
+        `user_id` INT NOT NULL,
+        `status` varchar(255) NULL,
+        `t_unsigned_int` int UNSIGNED NULL,
+        CONSTRAINT unique_id UNIQUE (order_id),
+        INDEX ( `user_id` )
+        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+    </create-table-order>
+</command>
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 1b17ff77540..edd701f1189 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -35,9 +36,11 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.AfterClass;
@@ -77,8 +80,7 @@ public final class MigrationJobAPIImplTest {
         PipelineContextUtil.mockModeConfigAndContextManager();
         jobAPI = MigrationJobAPIFactory.getInstance();
         Map<String, Object> props = new HashMap<>();
-        // TODO if resource availability is checked, then it should not work
-        props.put("jdbcUrl", "jdbc:mysql://localhost:3306/test");
+        props.put("jdbcUrl", "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
         props.put("username", "root");
         props.put("password", "root");
         Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
@@ -151,8 +153,10 @@ public final class MigrationJobAPIImplTest {
     }
     
     @Test
-    public void assertDataConsistencyCheck() {
-        Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+    public void assertDataConsistencyCheck() throws NoSuchFieldException, IllegalAccessException {
+        MigrationJobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
+        ReflectionUtil.setFieldValue(jobConfiguration, "uniqueKeyColumn", new PipelineColumnMetaData(1, "order_id", 4, "", false, true));
+        Optional<String> jobId = jobAPI.start(jobConfiguration);
         assertTrue(jobId.isPresent());
         MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get());
         if (null == jobConfig.getSource()) {
@@ -266,7 +270,8 @@ public final class MigrationJobAPIImplTest {
     }
     
     @Test
-    public void assertCreateJobConfig() {
+    public void assertCreateJobConfig() throws SQLException {
+        initIntPrimaryEnvironment();
         CreateMigrationJobParameter parameter = new CreateMigrationJobParameter("ds_0", null, "t_order", "logic_db", "t_order");
         String jobId = jobAPI.createJobAndStart(parameter);
         MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId);
@@ -276,6 +281,18 @@ public final class MigrationJobAPIImplTest {
         assertThat(jobConfig.getTargetTableName(), is("t_order"));
     }
     
+    private void initIntPrimaryEnvironment() throws SQLException {
+        Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(JobType.MIGRATION);
+        DataSourceProperties dataSourceProperties = metaDataDataSource.get("ds_0");
+        DataSource dataSource = DataSourcePoolCreator.create(dataSourceProperties);
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS t_order");
+            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id int(10))");
+        }
+    }
+    
     @Test
     public void assertShowMigrationSourceResources() {
         Collection<Collection<Object>> actual = jobAPI.listMigrationSourceResources();
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 28a1b00a4af..06d4a52e159 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -18,13 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.core.prepare;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.junit.After;
@@ -36,9 +41,11 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 public final class InventoryTaskSplitterTest {
@@ -57,15 +64,19 @@ public final class InventoryTaskSplitterTest {
     }
     
     @Before
-    public void setUp() {
+    public void setUp() throws NoSuchFieldException, IllegalAccessException {
         initJobItemContext();
+        InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+        dumperConfig.setUniqueKeyDataType(Types.INTEGER);
+        dumperConfig.setUniqueKey("order_id");
         inventoryTaskSplitter = new InventoryTaskSplitter(
-                jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig().getDumperConfig(), jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
+                jobItemContext.getSourceDataSource(), dumperConfig, jobItemContext.getTaskConfig().getImporterConfig(), jobItemContext.getInitProgress(),
                 jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(), jobItemContext.getJobProcessContext().getImporterExecuteEngine());
     }
     
-    private void initJobItemContext() {
+    private void initJobItemContext() throws NoSuchFieldException, IllegalAccessException {
         MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+        ReflectionUtil.setFieldValue(jobConfig, "uniqueKeyColumn", new PipelineColumnMetaData(1, "order_id", 4, "", false, true));
         jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         dataSourceManager = jobItemContext.getDataSourceManager();
         taskConfig = jobItemContext.getTaskConfig();
@@ -110,14 +121,22 @@ public final class InventoryTaskSplitterTest {
     }
     
     @Test(expected = PipelineJobCreationException.class)
-    public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
+    public void assertSplitInventoryDataWithIllegalKeyDataType() throws SQLException, NoSuchFieldException, IllegalAccessException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
+        InventoryDumperConfiguration dumperConfig = ReflectionUtil.getFieldValue(inventoryTaskSplitter, "dumperConfig", InventoryDumperConfiguration.class);
+        assertNotNull(dumperConfig);
+        dumperConfig.setUniqueKey("order_id,user_id");
+        dumperConfig.setUniqueKeyDataType(Integer.MIN_VALUE);
         inventoryTaskSplitter.splitInventoryData(jobItemContext);
     }
     
     @Test(expected = PipelineJobCreationException.class)
-    public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException {
+    public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException, NoSuchFieldException, IllegalAccessException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
+        try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig())) {
+            PipelineColumnMetaData uniqueKeyColumn = PipelineTableMetaDataUtil.getUniqueKeyColumn(null, "t_order", dataSource, null);
+            ReflectionUtil.setFieldValue(jobItemContext.getJobConfig(), "uniqueKeyColumn", uniqueKeyColumn);
+        }
         inventoryTaskSplitter.splitInventoryData(jobItemContext);
     }