You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/02/10 10:17:28 UTC

[shardingsphere] branch master updated: Pipeline job support any type of single column unique key table (#24108)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c6309070a84 Pipeline job support any type of single column unique key table (#24108)
c6309070a84 is described below

commit c6309070a847c3ca12939167805ca44e421ecdeb
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Feb 10 18:17:08 2023 +0800

    Pipeline job support any type of single column unique key table (#24108)
    
    * Pipeline job support any type of single column unique key table
    
    * Add starting and finished log in PipelineWatcher
    
    * Refactor REFRESH TABLE METADATA invocation
    
    * Refactor containerComposer.start()
    
    * Refactor NoUniqueKeyMigrationE2EIT to IndexesMigrationE2EIT; Refactor mysql.xml to none.xml
    
    * Support no unique key migration consistency check
    
    * Add special type single column unique key migration E2E
    
    * Fix unit test
---
 .../api/ingest/position/NoUniqueKeyPosition.java   |  3 --
 .../api/ingest/position/PrimaryKeyPosition.java    |  2 +-
 .../ingest/position/PrimaryKeyPositionFactory.java | 22 +++++----
 .../ingest/position/UnsupportedKeyPosition.java}   | 34 +++++++++----
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |  6 +--
 .../core/ingest/dumper/InventoryDumper.java        | 51 ++++----------------
 .../YamlJobItemInventoryTasksProgressSwapper.java  |  9 ++--
 .../core/prepare/InventoryTaskSplitter.java        | 16 +++----
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |  8 ++--
 .../fixture/FixturePipelineSQLBuilder.java         |  4 +-
 .../MigrationDataConsistencyChecker.java           |  6 ---
 .../pipeline/cases/base/PipelineBaseE2EIT.java     |  2 -
 .../general/MySQLMigrationGeneralE2EIT.java        |  1 +
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  1 +
 ...rationE2EIT.java => IndexesMigrationE2EIT.java} | 56 ++++++++++++++++++----
 .../container/compose/NativeContainerComposer.java |  5 --
 .../framework/param/PipelineTestParameter.java     |  1 +
 .../framework/watcher/PipelineWatcher.java         |  7 +++
 .../resources/env/common/migration-command.xml     |  6 +--
 .../none_primary_key/mysql.xml => common/none.xml} |  8 +---
 .../ingest/position/NoUniqueKeyPositionTest.java   | 45 +++++++----------
 .../position/UnsupportedKeyPositionTest.java       | 41 ++++++++++++++++
 .../core/prepare/InventoryTaskSplitterTest.java    |  8 ++--
 23 files changed, 188 insertions(+), 154 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
index 911842bf9b2..01dd783d010 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
@@ -17,12 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 
-import lombok.RequiredArgsConstructor;
-
 /**
  * No unique key position.
  */
-@RequiredArgsConstructor
 public final class NoUniqueKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<NoUniqueKeyPosition> {
     
     @Override
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java
index 6f79125bc67..e690e1acad8 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java
@@ -42,6 +42,6 @@ public abstract class PrimaryKeyPosition<T> {
     
     @Override
     public final String toString() {
-        return String.format("%s,%s,%s", getType(), getBeginValue(), getEndValue());
+        return String.format("%s,%s,%s", getType(), null != getBeginValue() ? getBeginValue() : "", null != getEndValue() ? getEndValue() : "");
     }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
index da318934ade..06d20776135 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
@@ -18,7 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 
 import com.google.common.base.Preconditions;
-import lombok.NonNull;
+import com.google.common.base.Splitter;
+
+import java.util.List;
 
 /**
  * Primary key position factory.
@@ -32,12 +34,12 @@ public final class PrimaryKeyPositionFactory {
      * @return primary key position
      */
     public static IngestPosition<?> newInstance(final String data) {
-        String[] array = data.split(",");
-        Preconditions.checkArgument(3 == array.length, "Unknown primary key position: " + data);
-        Preconditions.checkArgument(1 == array[0].length(), "Invalid primary key position type: " + array[0]);
-        char type = array[0].charAt(0);
-        String beginValue = array[1];
-        String endValue = array[2];
+        List<String> parts = Splitter.on(',').splitToList(data);
+        Preconditions.checkArgument(3 == parts.size(), "Unknown primary key position: " + data);
+        Preconditions.checkArgument(1 == parts.get(0).length(), "Invalid primary key position type: " + parts.get(0));
+        char type = parts.get(0).charAt(0);
+        String beginValue = parts.get(1);
+        String endValue = parts.get(2);
         switch (type) {
             case 'i':
                 return new IntegerPrimaryKeyPosition(Long.parseLong(beginValue), Long.parseLong(endValue));
@@ -45,6 +47,8 @@ public final class PrimaryKeyPositionFactory {
                 return new StringPrimaryKeyPosition(beginValue, endValue);
             case 'n':
                 return new NoUniqueKeyPosition();
+            case 'u':
+                return new UnsupportedKeyPosition();
             default:
                 throw new IllegalArgumentException("Unknown primary key position type: " + type);
         }
@@ -57,13 +61,13 @@ public final class PrimaryKeyPositionFactory {
      * @param endValue end value
      * @return ingest position
      */
-    public static IngestPosition<?> newInstance(final @NonNull Object beginValue, final @NonNull Object endValue) {
+    public static IngestPosition<?> newInstance(final Object beginValue, final Object endValue) {
         if (beginValue instanceof Number) {
             return new IntegerPrimaryKeyPosition(((Number) beginValue).longValue(), ((Number) endValue).longValue());
         }
         if (beginValue instanceof CharSequence) {
             return new StringPrimaryKeyPosition(beginValue.toString(), endValue.toString());
         }
-        throw new IllegalArgumentException("Unknown begin value type: " + beginValue.getClass().getName());
+        return new UnsupportedKeyPosition();
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineJobUniqueKeyDataTypeException.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
similarity index 53%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineJobUniqueKeyDataTypeException.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
index 86ce80c12c8..845ee2581c6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineJobUniqueKeyDataTypeException.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
@@ -15,19 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.exception.data;
-
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+package org.apache.shardingsphere.data.pipeline.api.ingest.position;
 
 /**
- * Unsupported pipeline job unique key data type exception.
+ * Unsupported key position.
  */
-public final class UnsupportedPipelineJobUniqueKeyDataTypeException extends PipelineSQLException {
+public final class UnsupportedKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<UnsupportedKeyPosition> {
+    
+    @Override
+    public Void getBeginValue() {
+        return null;
+    }
+    
+    @Override
+    public Void getEndValue() {
+        return null;
+    }
+    
+    @Override
+    protected Void convert(final String value) {
+        throw new UnsupportedOperationException();
+    }
     
-    private static final long serialVersionUID = -1605633809724671592L;
+    @Override
+    protected char getType() {
+        return 'u';
+    }
     
-    public UnsupportedPipelineJobUniqueKeyDataTypeException(final int uniqueKeyDataType) {
-        super(XOpenSQLState.FEATURE_NOT_SUPPORTED, 32, String.format("Unsupported data type `%s` of unique key for pipeline job.", uniqueKeyDataType));
+    @Override
+    public int compareTo(final UnsupportedKeyPosition position) {
+        return 0;
     }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 5406f5cbc8f..edff4ca1d18 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -47,10 +47,9 @@ public interface PipelineSQLBuilder extends TypedSPI {
      * @param tableName table name
      * @param uniqueKey unique key
      * @param uniqueKeyDataType unique key data type
-     * @param firstQuery whether it's the first time query
      * @return divisible inventory dump SQL
      */
-    String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType, boolean firstQuery);
+    String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType);
     
     /**
      * Build indivisible inventory dump first SQL.
@@ -59,10 +58,9 @@ public interface PipelineSQLBuilder extends TypedSPI {
      * @param tableName table name
      * @param uniqueKey unique key
      * @param uniqueKeyDataType unique key data type
-     * @param firstQuery whether it's the first time query
      * @return indivisible inventory dump SQL
      */
-    String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType, boolean firstQuery);
+    String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType);
     
     /**
      * Build inventory dump all SQL.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index c1621b5171e..218c293d07c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -37,7 +37,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineJobUniqueKeyDataTypeException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
@@ -54,7 +53,6 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Optional;
 
 /**
  * Inventory dumper.
@@ -90,25 +88,15 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     
     @Override
     protected void runBlocking() {
-        String firstSQL = buildInventoryDumpSQL(true);
-        String laterSQL = buildInventoryDumpSQL(false);
         IngestPosition<?> position = dumperConfig.getPosition();
         if (position instanceof FinishedPosition) {
             log.info("Ignored because of already finished.");
             return;
         }
         PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName());
-        Object beginUniqueKeyValue = ((PrimaryKeyPosition<?>) position).getBeginValue();
-        int round = 1;
         try (Connection connection = dataSource.getConnection()) {
-            Optional<Object> maxUniqueKeyValue;
-            while ((maxUniqueKeyValue = dump(tableMetaData, connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
-                beginUniqueKeyValue = maxUniqueKeyValue.get();
-                if (!isRunning()) {
-                    break;
-                }
-            }
-            log.info("Inventory dump done, round={}, maxUniqueKeyValue={}.", round, maxUniqueKeyValue);
+            dump(tableMetaData, connection, buildInventoryDumpSQL(), ((PrimaryKeyPosition<?>) position).getBeginValue());
+            log.info("Inventory dump done");
         } catch (final SQLException ex) {
             log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
             throw new IngestException(ex);
@@ -117,69 +105,48 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         }
     }
     
-    private String buildInventoryDumpSQL(final boolean firstQuery) {
+    private String buildInventoryDumpSQL() {
         String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
         if (null == dumperConfig.getUniqueKey()) {
             return sqlBuilder.buildInventoryDumpAllSQL(schemaName, dumperConfig.getActualTableName());
         }
         if (PipelineJdbcUtils.isIntegerColumn(dumperConfig.getUniqueKeyDataType())) {
-            return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), firstQuery);
-        }
-        if (PipelineJdbcUtils.isStringColumn(dumperConfig.getUniqueKeyDataType())) {
-            return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), firstQuery);
+            return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType());
         }
-        throw new UnsupportedPipelineJobUniqueKeyDataTypeException(dumperConfig.getUniqueKeyDataType());
+        return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType());
     }
     
-    private Optional<Object> dump(final PipelineTableMetaData tableMetaData, final Connection connection, final String sql, final Object beginUniqueKeyValue, final int round) throws SQLException {
+    private void dump(final PipelineTableMetaData tableMetaData, final Connection connection, final String sql, final Object beginUniqueKeyValue) throws SQLException {
         if (null != dumperConfig.getRateLimitAlgorithm()) {
             dumperConfig.getRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
         }
         int batchSize = dumperConfig.getBatchSize();
         try (PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
             dumpStatement = preparedStatement;
-            setParameters(preparedStatement, batchSize, beginUniqueKeyValue);
+            preparedStatement.setFetchSize(batchSize);
+            setParameters(preparedStatement, beginUniqueKeyValue);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-                int rowCount = 0;
-                Object maxUniqueKeyValue = null;
                 while (resultSet.next()) {
                     channel.pushRecord(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
-                    rowCount++;
-                    if (null != dumperConfig.getUniqueKey()) {
-                        maxUniqueKeyValue = columnValueReader.readValue(resultSet, resultSetMetaData, tableMetaData.getColumnMetaData(dumperConfig.getUniqueKey()).getOrdinalPosition());
-                    }
                     if (!isRunning()) {
                         log.info("Broke because of inventory dump is not running.");
                         break;
                     }
                 }
-                if (0 == round % 50) {
-                    log.info("Dumping, round={}, rowCount={}, maxUniqueKeyValue={}.", round, rowCount, maxUniqueKeyValue);
-                }
                 dumpStatement = null;
-                return Optional.ofNullable(maxUniqueKeyValue);
             }
         }
     }
     
-    private void setParameters(final PreparedStatement preparedStatement, final int batchSize, final Object beginUniqueKeyValue) throws SQLException {
-        preparedStatement.setFetchSize(batchSize);
+    private void setParameters(final PreparedStatement preparedStatement, final Object beginUniqueKeyValue) throws SQLException {
         if (null == dumperConfig.getUniqueKey()) {
             return;
         }
         if (PipelineJdbcUtils.isIntegerColumn(dumperConfig.getUniqueKeyDataType())) {
             preparedStatement.setObject(1, beginUniqueKeyValue);
             preparedStatement.setObject(2, ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
-            preparedStatement.setInt(3, batchSize);
-            return;
-        }
-        if (PipelineJdbcUtils.isStringColumn(dumperConfig.getUniqueKeyDataType())) {
-            preparedStatement.setObject(1, beginUniqueKeyValue);
-            preparedStatement.setInt(2, batchSize);
-            return;
         }
-        throw new UnsupportedPipelineJobUniqueKeyDataTypeException(dumperConfig.getUniqueKeyDataType());
     }
     
     private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
index add094b8604..6b3af441973 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -54,13 +55,13 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
     private String[] getFinished(final JobItemInventoryTasksProgress progress) {
         return progress.getInventoryTaskProgressMap().entrySet().stream()
                 .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition)
-                .map(Map.Entry::getKey).toArray(String[]::new);
+                .map(Entry::getKey).toArray(String[]::new);
     }
     
     private Map<String, String> getUnfinished(final JobItemInventoryTasksProgress progress) {
         return progress.getInventoryTaskProgressMap().entrySet().stream()
                 .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition))
-                .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition().toString()));
+                .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition().toString()));
     }
     
     /**
@@ -75,11 +76,11 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
         }
         Map<String, InventoryTaskProgress> taskProgressMap = new LinkedHashMap<>();
         taskProgressMap.putAll(Arrays.stream(yamlProgress.getFinished()).collect(Collectors.toMap(key -> key, value -> new InventoryTaskProgress(new FinishedPosition()))));
-        taskProgressMap.putAll(yamlProgress.getUnfinished().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, getInventoryTaskProgressFunction())));
+        taskProgressMap.putAll(yamlProgress.getUnfinished().entrySet().stream().collect(Collectors.toMap(Entry::getKey, getInventoryTaskProgressFunction())));
         return new JobItemInventoryTasksProgress(taskProgressMap);
     }
     
-    private Function<Map.Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() {
+    private Function<Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() {
         return entry -> new InventoryTaskProgress(
                 Strings.isNullOrEmpty(entry.getValue()) ? new PlaceholderPosition() : PrimaryKeyPositionFactory.newInstance(entry.getValue()));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 326d376f561..49c2e98ac5c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -36,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -156,12 +155,9 @@ public final class InventoryTaskSplitter {
         }
         int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
         if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
-            return getPositionByIntegerPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
+            return getPositionByIntegerUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
         }
-        if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
-            return getPositionByStringPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
-        }
-        throw new SplitPipelineJobByRangeException(dumperConfig.getActualTableName(), "primary key is not integer or string type");
+        return getPositionByStringUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
     }
     
     private Collection<IngestPosition<?>> getPositionWithoutUniqueKey(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
@@ -189,8 +185,8 @@ public final class InventoryTaskSplitter {
         }
     }
     
-    private Collection<IngestPosition<?>> getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                              final InventoryDumperConfiguration dumperConfig) {
+    private Collection<IngestPosition<?>> getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+                                                                             final InventoryDumperConfiguration dumperConfig) {
         Collection<IngestPosition<?>> result = new LinkedList<>();
         PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
@@ -229,8 +225,8 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<IngestPosition<?>> getPositionByStringPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                             final InventoryDumperConfiguration dumperConfig) {
+    private Collection<IngestPosition<?>> getPositionByStringUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+                                                                            final InventoryDumperConfiguration dumperConfig) {
         long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
         Collection<IngestPosition<?>> result = new LinkedList<>();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 8928e401973..ac3455e9b74 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -71,17 +71,17 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     protected abstract String getRightIdentifierQuoteString();
     
     @Override
-    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = quote(uniqueKey);
-        return String.format("SELECT * FROM %s WHERE %s%s? AND %s<=? ORDER BY %s ASC LIMIT ?", qualifiedTableName, quotedUniqueKey, firstQuery ? ">=" : ">", quotedUniqueKey, quotedUniqueKey);
+        return String.format("SELECT * FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey, quotedUniqueKey);
     }
     
     @Override
-    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = quote(uniqueKey);
-        return String.format("SELECT * FROM %s WHERE %s%s? ORDER BY %s ASC LIMIT ?", qualifiedTableName, quotedUniqueKey, firstQuery ? ">=" : ">", quotedUniqueKey);
+        return String.format("SELECT * FROM %s ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey);
     }
     
     protected final String getQualifiedTableName(final String schemaName, final String tableName) {
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 3db2baad3f5..08e46bfa21e 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -29,12 +29,12 @@ import java.util.Optional;
 public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     
     @Override
-    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
         return "";
     }
     
     @Override
-    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
         return "";
     }
     
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index cf94daf6a76..a62c9cb83e9 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckIgnoredType;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
@@ -80,11 +79,6 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
         SchemaTableName targetTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())), new TableName(jobConfig.getTargetTableName()));
         progressContext.getTableNames().add(jobConfig.getSourceTableName());
         Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
-        if (null == jobConfig.getUniqueKeyColumn()) {
-            progressContext.getIgnoredTableNames().add(sourceTable.getTableName().getOriginal());
-            result.put(sourceTable.getTableName().getOriginal(), new DataConsistencyCheckResult(DataConsistencyCheckIgnoredType.NO_UNIQUE_KEY));
-            return result;
-        }
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
                 PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 311d40e4337..13c1a671b82 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -114,7 +114,6 @@ public abstract class PipelineBaseE2EIT {
         containerComposer = ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER
                 ? new DockerContainerComposer(testParam.getDatabaseType(), testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
                 : new NativeContainerComposer(testParam.getDatabaseType());
-        containerComposer.start();
         if (ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER) {
             DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
             username = storageContainer.getUsername();
@@ -357,7 +356,6 @@ public abstract class PipelineBaseE2EIT {
     }
     
     protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) throws SQLException {
-        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
         String countSQL = Strings.isNullOrEmpty(schema) ? "SELECT COUNT(*) as count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", schema);
         Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
         log.info("actual count {}", actual.get("count"));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index bc01c64ba87..540ee00ad0e 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -108,6 +108,7 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         }
         List<String> lastJobIds = listJobId();
         assertThat(lastJobIds.size(), is(0));
+        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
         assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT, "");
         log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
     }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d1b44b688a1..d9fd0aa96fe 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -101,6 +101,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
         }
         List<String> lastJobIds = listJobId();
         assertThat(lastJobIds.size(), is(0));
+        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
         assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1, PipelineBaseE2EIT.SCHEMA_NAME);
         log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
     }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
similarity index 63%
rename from test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
rename to test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index c3f5489d48d..56145d8ec51 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,12 +20,13 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -42,13 +43,18 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertFalse;
 
+/**
+ * E2E IT for different types of indexes, includes:
+ * 1) no unique key.
+ * 2) special type single column unique key.
+ */
 @RunWith(Parameterized.class)
 @Slf4j
-public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
+public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private final PipelineTestParameter testParam;
     
-    public NoUniqueKeyMigrationE2EIT(final PipelineTestParameter testParam) {
+    public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
         this.testParam = testParam;
     }
@@ -60,7 +66,7 @@ public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
             return result;
         }
         for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) {
-            result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/scenario/primary_key/none_primary_key/mysql.xml"));
+            result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/common/none.xml"));
         }
         return result;
     }
@@ -71,12 +77,38 @@ public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     }
     
     @Test
-    public void assertTextPrimaryMigrationSuccess() throws SQLException, InterruptedException {
-        log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
+    public void assertNoUniqueKeyMigrationSuccess() throws SQLException, InterruptedException {
+        String sql;
+        String consistencyCheckAlgorithmType;
+        if (getDatabaseType() instanceof MySQLDatabaseType) {
+            sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+            // DATA_MATCH doesn't supported, could not order by records
+            consistencyCheckAlgorithmType = "CRC32_MATCH";
+        } else {
+            return;
+        }
+        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+    }
+    
+    @Test
+    public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess() throws SQLException, InterruptedException {
+        String sql;
+        String consistencyCheckAlgorithmType;
+        if (getDatabaseType() instanceof MySQLDatabaseType) {
+            sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+            // DATA_MATCH doesn't supported: Order by value must implements Comparable
+            consistencyCheckAlgorithmType = "CRC32_MATCH";
+        } else {
+            return;
+        }
+        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+    }
+    
+    private void assertMigrationSuccess(final String sqlPattern, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
         initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
-        createSourceOrderTable();
+        createSourceOrderTable(sqlPattern);
         try (Connection connection = getSourceDataSource().getConnection()) {
-            AutoIncrementKeyGenerateAlgorithm generateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
+            KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
             PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
         }
         addMigrationProcessConfig();
@@ -86,12 +118,16 @@ public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         String jobId = listJobId().get(0);
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+        commitMigrationByJobId(jobId);
         proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
         assertTargetAndSourceCountAreSame();
-        commitMigrationByJobId(jobId);
         List<String> lastJobIds = listJobId();
         assertThat(lastJobIds.size(), is(0));
-        log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+    }
+    
+    private void createSourceOrderTable(final String sqlPattern) throws SQLException {
+        sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
     }
     
     private void assertTargetAndSourceCountAreSame() {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java
index 4321940f414..e37d8c84510 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java
@@ -47,11 +47,6 @@ public final class NativeContainerComposer extends BaseContainerComposer {
         this.databaseType = databaseType;
     }
     
-    @Override
-    public void start() {
-        // do nothing
-    }
-    
     @SneakyThrows(SQLException.class)
     @Override
     public void cleanUpDatabase(final String databaseName) {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java
index e6ec743e71f..5425b9ee7e6 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java
@@ -31,6 +31,7 @@ public final class PipelineTestParameter {
     
     private final String storageContainerImage;
     
+    // TODO It's not scenario. Remove it later
     private final String scenario;
     
     private final int storageContainerCount;
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
index fa765e17ccb..156aaf59385 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
@@ -80,8 +80,15 @@ public class PipelineWatcher extends TestWatcher {
         }
     }
     
+    @Override
+    protected void starting(final Description description) {
+        log.info("starting: {}", description);
+        containerComposer.start();
+    }
+    
     @Override
     protected void finished(final Description description) {
+        log.info("finished: {}", description);
         containerComposer.close();
     }
 }
diff --git a/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml b/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml
index cc0a0de9a50..4592ef89ff3 100644
--- a/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml
+++ b/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml
@@ -67,8 +67,7 @@
         CREATE SHARDING TABLE RULE t_order(
         STORAGE_UNITS(ds_2,ds_3,ds_4),
         SHARDING_COLUMN=user_id,
-        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
-        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6"))
         );
     </create-target-order-table-rule>
     
@@ -76,8 +75,7 @@
         CREATE SHARDING TABLE RULE t_order_item(
         DATANODES('ds_${2..4}.t_order_item_${0..1}'),
         DATABASE_STRATEGY(TYPE='standard',SHARDING_COLUMN=user_id,SHARDING_ALGORITHM(TYPE(NAME='inline',PROPERTIES('algorithm-expression'='ds_${user_id % 3 + 2}')))),
-        TABLE_STRATEGY(TYPE='standard',SHARDING_COLUMN=order_id,SHARDING_ALGORITHM(TYPE(NAME='inline',PROPERTIES('algorithm-expression'='t_order_item_${order_id % 2}')))),
-        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME='snowflake'))
+        TABLE_STRATEGY(TYPE='standard',SHARDING_COLUMN=order_id,SHARDING_ALGORITHM(TYPE(NAME='inline',PROPERTIES('algorithm-expression'='t_order_item_${order_id % 2}'))))
         );
     </create-target-order-item-table-rule>
     
diff --git a/test/e2e/pipeline/src/test/resources/env/scenario/primary_key/none_primary_key/mysql.xml b/test/e2e/pipeline/src/test/resources/env/common/none.xml
similarity index 78%
rename from test/e2e/pipeline/src/test/resources/env/scenario/primary_key/none_primary_key/mysql.xml
rename to test/e2e/pipeline/src/test/resources/env/common/none.xml
index 58434662c39..ce00c632cb3 100644
--- a/test/e2e/pipeline/src/test/resources/env/scenario/primary_key/none_primary_key/mysql.xml
+++ b/test/e2e/pipeline/src/test/resources/env/common/none.xml
@@ -14,12 +14,6 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
+<!-- TODO remove it later -->
 <command>
-    <create-table-order>
-        CREATE TABLE `%s` (
-        `order_id` INT NOT NULL,
-        `user_id` INT NOT NULL,
-        `status` varchar(255) NULL
-        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-    </create-table-order>
 </command>
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/NoUniqueKeyPositionTest.java
similarity index 50%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
copy to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/NoUniqueKeyPositionTest.java
index 911842bf9b2..2adec288d17 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/NoUniqueKeyPositionTest.java
@@ -15,38 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.ingest.position;
+package org.apache.shardingsphere.test.it.data.pipeline.core.ingest.position;
 
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.NoUniqueKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory;
+import org.junit.Test;
 
-/**
- * No unique key position.
- */
-@RequiredArgsConstructor
-public final class NoUniqueKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<NoUniqueKeyPosition> {
-    
-    @Override
-    public Void getBeginValue() {
-        return null;
-    }
-    
-    @Override
-    public Void getEndValue() {
-        return null;
-    }
-    
-    @Override
-    protected Void convert(final String value) {
-        throw new UnsupportedOperationException();
-    }
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class NoUniqueKeyPositionTest {
     
-    @Override
-    protected char getType() {
-        return 'n';
+    @Test
+    public void assertInit() {
+        NoUniqueKeyPosition position = (NoUniqueKeyPosition) PrimaryKeyPositionFactory.newInstance("n,,");
+        assertNull(position.getBeginValue());
+        assertNull(position.getEndValue());
     }
     
-    @Override
-    public int compareTo(final NoUniqueKeyPosition position) {
-        return 0;
+    @Test
+    public void assertToString() {
+        assertThat(new NoUniqueKeyPosition().toString(), is("n,,"));
     }
 }
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/UnsupportedKeyPositionTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/UnsupportedKeyPositionTest.java
new file mode 100644
index 00000000000..01073a68677
--- /dev/null
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/UnsupportedKeyPositionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.ingest.position;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.UnsupportedKeyPosition;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class UnsupportedKeyPositionTest {
+    
+    @Test
+    public void assertInit() {
+        UnsupportedKeyPosition position = (UnsupportedKeyPosition) PrimaryKeyPositionFactory.newInstance("u,,");
+        assertNull(position.getBeginValue());
+        assertNull(position.getEndValue());
+    }
+    
+    @Test
+    public void assertToString() {
+        assertThat(new UnsupportedKeyPosition().toString(), is("u,,"));
+    }
+}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 52b48485a2b..58f4796b3f1 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
@@ -122,15 +121,16 @@ public final class InventoryTaskSplitterTest {
         assertThat(actual.size(), is(1));
     }
     
-    @Test(expected = SplitPipelineJobByRangeException.class)
-    public void assertSplitInventoryDataWithIllegalKeyDataType() throws SQLException, ReflectiveOperationException {
+    @Test
+    public void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException, ReflectiveOperationException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
         InventoryDumperConfiguration dumperConfig = (InventoryDumperConfiguration) Plugins.getMemberAccessor()
                 .get(InventoryTaskSplitter.class.getDeclaredField("dumperConfig"), inventoryTaskSplitter);
         assertNotNull(dumperConfig);
         dumperConfig.setUniqueKey("order_id,user_id");
         dumperConfig.setUniqueKeyDataType(Integer.MIN_VALUE);
-        inventoryTaskSplitter.splitInventoryData(jobItemContext);
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobItemContext);
+        assertThat(actual.size(), is(1));
     }
     
     @Test