You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/03/11 01:39:15 UTC

[shardingsphere] branch master updated: Support custom columns in pipeline job (#24546)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 59e45c23df7 Support custom columns in pipeline job (#24546)
59e45c23df7 is described below

commit 59e45c23df77268b3c8dec935b200d0758438286
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Mar 11 09:38:58 2023 +0800

    Support custom columns in pipeline job (#24546)
---
 .../DataConsistencyCalculateParameter.java         |   4 +-
 .../api/config/ingest/DumperConfiguration.java     |  35 +++++++
 .../ingest/InventoryDumperConfiguration.java       |   1 +
 .../data/pipeline/api/metadata/ColumnName.java}    |  27 ++----
 .../api/metadata/model/PipelineTableMetaData.java  |   1 +
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |  13 ++-
 ...SingleTableInventoryDataConsistencyChecker.java |  15 +--
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  13 +--
 .../pipeline/core/importer/DataSourceImporter.java |   3 -
 .../core/ingest/dumper/InventoryDumper.java        |  17 ++--
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |  25 +++--
 .../fixture/FixturePipelineSQLBuilder.java         |   8 +-
 .../mysql/ingest/MySQLIncrementalDumper.java       |  28 ++++--
 .../mysql/ingest/MySQLIncrementalDumperTest.java   | 107 +++++++++++++++++----
 .../postgresql/ingest/wal/WALEventConverter.java   |  61 ++++++------
 .../ingest/wal/WALEventConverterTest.java          |  91 +++++++++++++++---
 .../MigrationDataConsistencyChecker.java           |   7 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |   2 +-
 .../core/fixture/FixtureColumnValueReader.java     |  11 +--
 .../core/fixture/FixturePipelineSQLBuilder.java    |  10 +-
 .../spi/sqlbuilder/PipelineSQLBuilderTest.java     |  41 ++++++++
 ...ta.pipeline.spi.ingest.dumper.ColumnValueReader |  18 ++++
 22 files changed, 389 insertions(+), 149 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 9fb97eb3f3a..cbfc9eeb87d 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -24,7 +24,7 @@ import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 
-import java.util.Collection;
+import java.util.List;
 
 /**
  * Data consistency calculate parameter.
@@ -45,7 +45,7 @@ public final class DataConsistencyCalculateParameter {
     
     private final String logicTableName;
     
-    private final Collection<String> columnNames;
+    private final List<String> columnNames;
     
     private final String databaseType;
     
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 145828c2052..8cb49bfd012 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config.ingest;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
@@ -24,9 +25,14 @@ import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Dumper configuration.
@@ -50,6 +56,10 @@ public class DumperConfiguration {
     
     private TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
+    // LinkedHashSet is required
+    @Getter(AccessLevel.PROTECTED)
+    private Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap;
+    
     private boolean decodeWithTX;
     
     /**
@@ -95,4 +105,29 @@ public class DumperConfiguration {
     public String getSchemaName(final ActualTableName actualTableName) {
         return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
     }
+    
+    /**
+     * Get column name list of table.
+     *
+     * @param logicTableName logic table name
+     * @return column names of table
+     */
+    public Optional<List<String>> getColumnNameList(final LogicTableName logicTableName) {
+        Set<ColumnName> columnNames = null != targetTableColumnsMap ? targetTableColumnsMap.get(logicTableName) : null;
+        if (null == columnNames) {
+            return Optional.empty();
+        }
+        return Optional.of(columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList()));
+    }
+    
+    /**
+     * Get column name set of table.
+     *
+     * @param actualTableName actual table name
+     * @return column names of table
+     */
+    public Optional<Set<ColumnName>> getColumnNameSet(final String actualTableName) {
+        Set<ColumnName> result = null != targetTableColumnsMap ? targetTableColumnsMap.get(getLogicTableName(actualTableName)) : null;
+        return Optional.ofNullable(result);
+    }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index b52539ac4c8..68a44376295 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -51,6 +51,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
         setDataSourceConfig(dumperConfig.getDataSourceConfig());
         setTableNameMap(dumperConfig.getTableNameMap());
         setTableNameSchemaNameMapping(dumperConfig.getTableNameSchemaNameMapping());
+        setTargetTableColumnsMap(dumperConfig.getTargetTableColumnsMap());
     }
     
     /**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/ColumnName.java
similarity index 54%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/ColumnName.java
index 17f312fdffd..4ff60573b6b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/ColumnName.java
@@ -15,29 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
-
-import lombok.RequiredArgsConstructor;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
 
 /**
- * Basic column value reader.
+ * Column name.
+ *
+ * <p>It's case-insensitive.</p>
  */
-@RequiredArgsConstructor
-public final class BasicColumnValueReader extends AbstractColumnValueReader {
-    
-    private final String databaseType;
-    
-    @Override
-    protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        return super.defaultDoReadValue(resultSet, metaData, columnIndex);
-    }
+public final class ColumnName extends IdentifierName {
     
-    @Override
-    public String getType() {
-        return databaseType;
+    public ColumnName(final String columnName) {
+        super(columnName);
     }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
index 7fd41b6c96e..1f8aee1d05e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
@@ -68,6 +68,7 @@ public final class PipelineTableMetaData {
      * @param columnIndex the first column is 1, the second is 2, ...
      * @return column meta data
      */
+    // TODO Remove it. Get column meta data by column name for incremental dumper, since columns ordering might be changed.
     public PipelineColumnMetaData getColumnMetaData(final int columnIndex) {
         return getColumnMetaData(columnNames.get(columnIndex - 1));
     }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index b7ebb6dad60..05f8fc10a58 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -45,30 +45,33 @@ public interface PipelineSQLBuilder extends TypedSPI {
      *
      * @param schemaName schema name
      * @param tableName table name
+     * @param columnNames column names
      * @param uniqueKey unique key
      * @return divisible inventory dump SQL
      */
-    String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey);
+    String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, List<String> columnNames, String uniqueKey);
     
     /**
      * Build divisible inventory dump SQL without end value.
      *
      * @param schemaName schema name
      * @param tableName table name
+     * @param columnNames column names
      * @param uniqueKey unique key
      * @return divisible inventory dump SQL without end value
      */
-    String buildDivisibleInventoryDumpSQLNoEnd(String schemaName, String tableName, String uniqueKey);
+    String buildDivisibleInventoryDumpSQLNoEnd(String schemaName, String tableName, List<String> columnNames, String uniqueKey);
     
     /**
      * Build indivisible inventory dump first SQL.
      *
      * @param schemaName schema name
      * @param tableName table name
+     * @param columnNames column names
      * @param uniqueKey unique key
      * @return indivisible inventory dump SQL
      */
-    String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey);
+    String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, List<String> columnNames, String uniqueKey);
     
     /**
      * Build no unique key inventory dump SQL.
@@ -104,6 +107,7 @@ public interface PipelineSQLBuilder extends TypedSPI {
      * @param record data record
      * @return filtered columns
      */
+    // TODO Consider remove extractUpdatedColumns. openGauss has special impl currently
     List<Column> extractUpdatedColumns(DataRecord record);
     
     /**
@@ -149,11 +153,12 @@ public interface PipelineSQLBuilder extends TypedSPI {
      *
      * @param schemaName schema name
      * @param tableName table name
+     * @param columnNames column names
      * @param uniqueKey unique key, it may be primary key, not null
      * @param firstQuery first query
      * @return query SQL
      */
-    String buildQueryAllOrderingSQL(String schemaName, String tableName, String uniqueKey, boolean firstQuery);
+    String buildQueryAllOrderingSQL(String schemaName, String tableName, List<String> columnNames, String uniqueKey, boolean firstQuery);
     
     /**
      * Build check empty SQL.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 7f4461394ec..b278267f059 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -28,21 +28,17 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -69,9 +65,9 @@ public final class SingleTableInventoryDataConsistencyChecker {
     
     private final SchemaTableName targetTable;
     
-    private final PipelineColumnMetaData uniqueKey;
+    private final List<String> columnNames;
     
-    private final PipelineTableMetaDataLoader metaDataLoader;
+    private final PipelineColumnMetaData uniqueKey;
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
@@ -99,9 +95,6 @@ public final class SingleTableInventoryDataConsistencyChecker {
         String targetDatabaseType = targetDataSource.getDatabaseType().getType();
         String schemaName = sourceTable.getSchemaName().getOriginal();
         String sourceTableName = sourceTable.getTableName().getOriginal();
-        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, sourceTableName);
-        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(schemaName, sourceTableName));
-        Collection<String> columnNames = tableMetaData.getColumnNames();
         Map<String, Object> tableCheckPositions = progressContext.getTableCheckPositions();
         DataConsistencyCalculateParameter sourceParam = buildParameter(
                 sourceDataSource, schemaName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey, tableCheckPositions.get(sourceTableName));
@@ -162,7 +155,7 @@ public final class SingleTableInventoryDataConsistencyChecker {
     }
     
     private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource,
-                                                             final String schemaName, final String tableName, final Collection<String> columnNames,
+                                                             final String schemaName, final String tableName, final List<String> columnNames,
                                                              final String sourceDatabaseType, final String targetDatabaseType, final PipelineColumnMetaData uniqueKey,
                                                              final Object tableCheckPosition) {
         return new DataConsistencyCalculateParameter(sourceDataSource, schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey, tableCheckPosition);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 14a6a4ad919..e926b0cadba 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.BasicColumnValueReader;
 import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
@@ -93,8 +92,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         try {
             Collection<Collection<Object>> records = new LinkedList<>();
             Object maxUniqueKeyValue = null;
-            ColumnValueReader columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, param.getDatabaseType())
-                    .orElseGet(() -> new BasicColumnValueReader(param.getDatabaseType()));
+            ColumnValueReader columnValueReader = PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, param.getDatabaseType());
             ResultSet resultSet = calculationContext.getResultSet();
             while (resultSet.next()) {
                 if (isCanceling()) {
@@ -178,13 +176,8 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
             throw new UnsupportedOperationException("Data consistency of DATA_MATCH type not support table without unique key and primary key now");
         }
         PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
-        String logicTableName = param.getLogicTableName();
-        String schemaName = param.getSchemaName();
-        String uniqueKey = param.getUniqueKey().getName();
-        if (null != param.getTableCheckPosition()) {
-            return sqlBuilder.buildQueryAllOrderingSQL(schemaName, logicTableName, uniqueKey, false);
-        }
-        return sqlBuilder.buildQueryAllOrderingSQL(schemaName, logicTableName, uniqueKey, true);
+        boolean firstQuery = null == param.getTableCheckPosition();
+        return sqlBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), param.getLogicTableName(), param.getColumnNames(), param.getUniqueKey().getName(), firstQuery);
     }
     
     @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index db2da8610d2..a929b2d9ddd 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -206,9 +206,6 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
     
     private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
         Set<String> shardingColumns = importerConfig.getShardingColumns(record.getTableName());
-        if (null == shardingColumns) {
-            log.error("executeUpdate, could not get shardingColumns, tableName={}, logicTableNames={}", record.getTableName(), importerConfig.getLogicTableNames());
-        }
         List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, shardingColumns);
         List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record);
         String updateSql = pipelineSqlBuilder.buildUpdateSQL(getSchemaName(record.getTableName()), record, conditionColumns);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 3d30e7876e4..a4e2025a497 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -58,6 +58,8 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Inventory dumper.
@@ -88,7 +90,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         this.dataSource = dataSource;
         String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
         sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, databaseType);
-        columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, databaseType).orElseGet(() -> new BasicColumnValueReader(databaseType));
+        columnValueReader = PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, databaseType);
         this.metaDataLoader = metaDataLoader;
     }
     
@@ -141,21 +143,23 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     }
     
     private String buildInventoryDumpSQL() {
-        String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+        LogicTableName logicTableName = new LogicTableName(dumperConfig.getLogicTableName());
+        String schemaName = dumperConfig.getSchemaName(logicTableName);
         if (!dumperConfig.hasUniqueKey()) {
             return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName, dumperConfig.getActualTableName());
         }
         PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
         PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
+        List<String> columnNames = dumperConfig.getColumnNameList(logicTableName).orElse(Collections.singletonList("*"));
         if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
             if (null != position.getBeginValue() && null != position.getEndValue()) {
-                return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
+                return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
             }
             if (null != position.getBeginValue() && null == position.getEndValue()) {
-                return sqlBuilder.buildDivisibleInventoryDumpSQLNoEnd(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
+                return sqlBuilder.buildDivisibleInventoryDumpSQLNoEnd(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
             }
         }
-        return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
+        return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
     }
     
     private void setParameters(final PreparedStatement preparedStatement) throws SQLException {
@@ -185,7 +189,8 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         result.setType(IngestDataChangeType.INSERT);
         result.setTableName(dumperConfig.getLogicTableName());
         for (int i = 1; i <= columnCount; i++) {
-            result.addColumn(new Column(resultSetMetaData.getColumnName(i), columnValueReader.readValue(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(i).isUniqueKey()));
+            String columnName = resultSetMetaData.getColumnName(i);
+            result.addColumn(new Column(columnName, columnValueReader.readValue(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
         }
         return result;
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index f6290fc4fc9..5ea009352c2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -71,24 +71,31 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     protected abstract String getRightIdentifierQuoteString();
     
     @Override
-    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = quote(uniqueKey);
-        return String.format("SELECT * FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey, quotedUniqueKey);
+        return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, quotedUniqueKey, quotedUniqueKey);
+    }
+    
+    private String buildQueryColumns(final List<String> columnNames) {
+        if (columnNames.isEmpty()) {
+            return "*";
+        }
+        return String.join(",", columnNames);
     }
     
     @Override
-    public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final String uniqueKey) {
+    public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = quote(uniqueKey);
-        return String.format("SELECT * FROM %s WHERE %s>=? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
+        return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
     }
     
     @Override
-    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = quote(uniqueKey);
-        return String.format("SELECT * FROM %s ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey);
+        return String.format("SELECT %s FROM %s ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
     }
     
     @Override
@@ -183,12 +190,12 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final String uniqueKey, final boolean firstQuery) {
+    public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = quote(uniqueKey);
         return firstQuery
-                ? String.format("SELECT * FROM %s ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey)
-                : String.format("SELECT * FROM %s WHERE %s>? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
+                ? String.format("SELECT %s FROM %s ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey)
+                : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
     }
     
     @Override
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 523a43e6b96..27341e58971 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -29,17 +29,17 @@ import java.util.Optional;
 public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     
     @Override
-    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+    public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
         return "";
     }
     
     @Override
-    public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final String uniqueKey) {
+    public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
         return "";
     }
     
     @Override
-    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+    public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
         return "";
     }
     
@@ -79,7 +79,7 @@ public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final String uniqueKey, final boolean firstQuery) {
+    public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
         return "";
     }
     
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 14e3dd709a4..d6a0e2d6462 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -57,6 +58,7 @@ import java.nio.charset.Charset;
 import java.security.SecureRandom;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * MySQL incremental dumper.
@@ -109,18 +111,16 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
             createPlaceholderRecord(event);
             return;
         }
+        PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((AbstractRowsEvent) event).getTableName());
         if (event instanceof WriteRowsEvent) {
-            PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
             handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
             return;
         }
         if (event instanceof UpdateRowsEvent) {
-            PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
             handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
             return;
         }
         if (event instanceof DeleteRowsEvent) {
-            PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
             handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
         }
     }
@@ -136,18 +136,27 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
     }
     
     private void handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
+        Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
         for (Serializable[] each : event.getAfterRows()) {
             DataRecord record = createDataRecord(event, each.length);
             record.setType(IngestDataChangeType.INSERT);
             for (int i = 0; i < each.length; i++) {
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
-                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
+                if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+                    continue;
+                }
+                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
             }
             channel.pushRecord(record);
         }
     }
     
+    boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, final String columnName) {
+        return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName));
+    }
+    
     private void handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
+        Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
         for (int i = 0; i < event.getBeforeRows().size(); i++) {
             Serializable[] beforeValues = event.getBeforeRows().get(i);
             Serializable[] afterValues = event.getAfterRows().get(i);
@@ -158,21 +167,28 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
                 Serializable newValue = afterValues[j];
                 boolean updated = !Objects.equals(newValue, oldValue);
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
+                if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+                    continue;
+                }
                 record.addColumn(new Column(columnMetaData.getName(),
                         handleValue(columnMetaData, oldValue),
-                        handleValue(columnMetaData, newValue), updated, columnMetaData.isPrimaryKey()));
+                        handleValue(columnMetaData, newValue), updated, columnMetaData.isUniqueKey()));
             }
             channel.pushRecord(record);
         }
     }
     
     private void handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
+        Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
         for (Serializable[] each : event.getBeforeRows()) {
             DataRecord record = createDataRecord(event, each.length);
             record.setType(IngestDataChangeType.DELETE);
             for (int i = 0, length = each.length; i < length; i++) {
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
-                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
+                if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+                    continue;
+                }
+                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
             }
             channel.pushRecord(record);
         }
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index ae10cc7d8f3..5ff6d3ef420 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -45,25 +46,32 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
 import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import javax.sql.DataSource;
+import java.io.Serializable;
+import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.when;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
@@ -71,22 +79,23 @@ public final class MySQLIncrementalDumperTest {
     
     private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
     
+    private DumperConfiguration dumperConfig;
+    
     private MySQLIncrementalDumper incrementalDumper;
     
     private MultiplexMemoryPipelineChannel channel;
     
-    @Mock
     private PipelineTableMetaData pipelineTableMetaData;
     
     @BeforeEach
     public void setUp() {
-        DumperConfiguration dumperConfig = mockDumperConfiguration();
+        dumperConfig = mockDumperConfiguration();
         initTableData(dumperConfig);
         dumperConfig.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
         channel = new MultiplexMemoryPipelineChannel(1, 10000, new EmptyAckCallback());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
         incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
-        when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true, true));
+        pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList());
     }
     
     private DumperConfiguration mockDumperConfiguration() {
@@ -104,57 +113,113 @@ public final class MySQLIncrementalDumperTest {
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
-            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT, status VARCHAR(12))");
+            statement.execute("INSERT INTO t_order (order_id, user_id, status) VALUES (101, 1, 'OK'), (102, 1, 'OK')");
         }
     }
     
+    private static Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
+        return mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName, Function.identity()));
+    }
+    
+    private static List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
+        List<PipelineColumnMetaData> result = new LinkedList<>();
+        result.add(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "INT", false, true, true));
+        result.add(new PipelineColumnMetaData(1, "user_id", Types.INTEGER, "INT", false, false, false));
+        result.add(new PipelineColumnMetaData(1, "status", Types.VARCHAR, "VARCHAR", false, false, false));
+        return result;
+    }
+    
     @AfterEach
     public void tearDown() {
         dataSourceManager.close();
     }
     
     @Test
-    public void assertWriteRowsEvent() throws ReflectiveOperationException {
+    public void assertIsColumnUnneeded() {
+        assertFalse(incrementalDumper.isColumnUnneeded(null, "order_id"));
+        assertFalse(incrementalDumper.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "order_id"));
+        assertTrue(incrementalDumper.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "status"));
+    }
+    
+    @Test
+    public void assertWriteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
+        assertWriteRowsEvent0(null, 3);
+    }
+    
+    @Test
+    public void assertWriteRowsEventWithCustomColumns() throws ReflectiveOperationException {
+        assertWriteRowsEvent0(mockTargetTableColumnsMap(), 1);
+    }
+    
+    private void assertWriteRowsEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+        dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
         WriteRowsEvent rowsEvent = new WriteRowsEvent();
         rowsEvent.setDatabaseName("");
         rowsEvent.setTableName("t_order");
-        rowsEvent.setAfterRows(Collections.singletonList(new String[]{"1", "order"}));
-        Plugins.getMemberAccessor().invoke(
-                MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class), incrementalDumper, rowsEvent, pipelineTableMetaData);
+        rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class);
+        Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
         List<Record> actual = channel.fetchRecords(1, 0);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.INSERT));
+        assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
+    }
+    
+    private Map<LogicTableName, Set<ColumnName>> mockTargetTableColumnsMap() {
+        return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")));
     }
     
     @Test
-    public void assertUpdateRowsEvent() throws ReflectiveOperationException {
+    public void assertUpdateRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
+        assertUpdateRowsEvent0(null, 3);
+    }
+    
+    @Test
+    public void assertUpdateRowsEventWithCustomColumns() throws ReflectiveOperationException {
+        assertUpdateRowsEvent0(mockTargetTableColumnsMap(), 1);
+    }
+    
+    private void assertUpdateRowsEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+        dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
         UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
         rowsEvent.setDatabaseName("");
         rowsEvent.setTableName("t_order");
-        rowsEvent.setBeforeRows(Collections.singletonList(new String[]{"1", "order_old"}));
-        rowsEvent.setAfterRows(Collections.singletonList(new String[]{"1", "order_new"}));
-        Plugins.getMemberAccessor().invoke(
-                MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class), incrementalDumper, rowsEvent, pipelineTableMetaData);
+        rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
+        Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class);
+        Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
         List<Record> actual = channel.fetchRecords(1, 0);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.UPDATE));
+        assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
+    }
+    
+    @Test
+    public void assertDeleteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
+        assertDeleteRowsEvent0(null, 3);
     }
     
     @Test
-    public void assertDeleteRowsEvent() throws ReflectiveOperationException {
+    public void assertDeleteRowsEventWithCustomColumns() throws ReflectiveOperationException {
+        assertDeleteRowsEvent0(mockTargetTableColumnsMap(), 1);
+    }
+    
+    private void assertDeleteRowsEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+        dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
         DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
         rowsEvent.setDatabaseName("");
         rowsEvent.setTableName("t_order");
-        rowsEvent.setBeforeRows(Collections.singletonList(new String[]{"1", "order"}));
-        Plugins.getMemberAccessor().invoke(
-                MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class), incrementalDumper, rowsEvent, pipelineTableMetaData);
+        rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class);
+        Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
         List<Record> actual = channel.fetchRecords(1, 0);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.DELETE));
+        assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
     }
     
     @Test
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index bbd59daa59a..ac5db47d1c0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -23,20 +23,20 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * WAL event converter.
@@ -62,61 +62,59 @@ public final class WALEventConverter {
         if (filter(event)) {
             return createPlaceholderRecord(event);
         }
+        if (!(event instanceof AbstractRowEvent)) {
+            return createPlaceholderRecord(event);
+        }
+        PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((AbstractRowEvent) event).getTableName());
         if (event instanceof WriteRowEvent) {
-            return handleWriteRowsEvent((WriteRowEvent) event);
+            return handleWriteRowEvent((WriteRowEvent) event, tableMetaData);
         }
         if (event instanceof UpdateRowEvent) {
-            return handleUpdateRowsEvent((UpdateRowEvent) event);
+            return handleUpdateRowEvent((UpdateRowEvent) event, tableMetaData);
         }
         if (event instanceof DeleteRowEvent) {
-            return handleDeleteRowsEvent((DeleteRowEvent) event);
-        }
-        if (event instanceof PlaceholderEvent || event instanceof BeginTXEvent || event instanceof CommitTXEvent) {
-            return createPlaceholderRecord(event);
+            return handleDeleteRowEvent((DeleteRowEvent) event, tableMetaData);
         }
         throw new UnsupportedSQLOperationException("");
     }
     
     private boolean filter(final AbstractWALEvent event) {
-        if (isRowEvent(event)) {
+        if (event instanceof AbstractRowEvent) {
             AbstractRowEvent rowEvent = (AbstractRowEvent) event;
             return !dumperConfig.containsTable(rowEvent.getTableName());
         }
         return false;
     }
     
-    private boolean isRowEvent(final AbstractWALEvent event) {
-        return event instanceof WriteRowEvent || event instanceof UpdateRowEvent || event instanceof DeleteRowEvent;
-    }
-    
     private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) {
         return new PlaceholderRecord(new WALPosition(event.getLogSequenceNumber()));
     }
     
-    private DataRecord handleWriteRowsEvent(final WriteRowEvent writeRowEvent) {
+    private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
+        return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
+    }
+    
+    private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
         DataRecord result = createDataRecord(writeRowEvent, writeRowEvent.getAfterRow().size());
         result.setType(IngestDataChangeType.INSERT);
-        putColumnsIntoDataRecord(result, getPipelineTableMetaData(writeRowEvent.getTableName()), writeRowEvent.getAfterRow());
+        putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getTableName(), writeRowEvent.getAfterRow());
         return result;
     }
     
-    private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
-        return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
-    }
-    
-    private DataRecord handleUpdateRowsEvent(final UpdateRowEvent updateRowEvent) {
+    private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
         DataRecord result = createDataRecord(updateRowEvent, updateRowEvent.getAfterRow().size());
         result.setType(IngestDataChangeType.UPDATE);
-        putColumnsIntoDataRecord(result, getPipelineTableMetaData(updateRowEvent.getTableName()), updateRowEvent.getAfterRow());
+        String actualTableName = updateRowEvent.getTableName();
+        putColumnsIntoDataRecord(result, tableMetaData, actualTableName, updateRowEvent.getAfterRow());
         return result;
     }
     
-    private DataRecord handleDeleteRowsEvent(final DeleteRowEvent event) {
+    private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final PipelineTableMetaData tableMetaData) {
         // TODO completion columns
         DataRecord result = createDataRecord(event, event.getPrimaryKeys().size());
         result.setType(IngestDataChangeType.DELETE);
         // TODO Unique key may be a column within unique index
-        List<String> primaryKeyColumns = getPipelineTableMetaData(event.getTableName()).getPrimaryKeyColumns();
+        List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
         for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
             result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), true, true));
         }
@@ -130,12 +128,21 @@ public final class WALEventConverter {
         return result;
     }
     
-    private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
+    private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List<Object> values) {
+        Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(actualTableName).orElse(null);
         for (int i = 0, count = values.size(); i < count; i++) {
-            boolean isUniqueKey = tableMetaData.getColumnMetaData(i + 1).isUniqueKey();
+            PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
+            if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+                continue;
+            }
+            boolean isUniqueKey = columnMetaData.isUniqueKey();
             Object uniqueKeyOldValue = isUniqueKey ? values.get(i) : null;
-            Column column = new Column(tableMetaData.getColumnMetaData(i + 1).getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
+            Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
             dataRecord.addColumn(column);
         }
     }
+    
+    boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, final String columnName) {
+        return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName));
+    }
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index dfaefecbcb9..66e9f05caf8 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -26,7 +26,10 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -39,40 +42,57 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Place
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.internal.configuration.plugins.Plugins;
 import org.postgresql.replication.LogSequenceNumber;
 
 import javax.sql.DataSource;
+import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public final class WALEventConverterTest {
     
+    private DumperConfiguration dumperConfig;
+    
     private WALEventConverter walEventConverter;
     
     private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
     
     private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
     
-    @Before
+    private PipelineTableMetaData pipelineTableMetaData;
+    
+    @BeforeEach
     public void setUp() {
-        DumperConfiguration dumperConfig = mockDumperConfiguration();
+        dumperConfig = mockDumperConfiguration();
         walEventConverter = new WALEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
         initTableData(dumperConfig);
+        pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList());
     }
     
-    @After
+    @AfterEach
     public void tearDown() {
         dataSourceManager.close();
     }
@@ -92,11 +112,56 @@ public final class WALEventConverterTest {
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
-            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT, status VARCHAR(12))");
+            statement.execute("INSERT INTO t_order (order_id, user_id, status) VALUES (101, 1, 'OK'), (102, 1, 'OK')");
         }
     }
     
+    private static Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
+        return mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName, Function.identity()));
+    }
+    
+    private static List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
+        List<PipelineColumnMetaData> result = new LinkedList<>();
+        result.add(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "INT", false, true, true));
+        result.add(new PipelineColumnMetaData(1, "user_id", Types.INTEGER, "INT", false, false, false));
+        result.add(new PipelineColumnMetaData(1, "status", Types.VARCHAR, "VARCHAR", false, false, false));
+        return result;
+    }
+    
+    @Test
+    public void assertIsColumnUnneeded() {
+        assertFalse(walEventConverter.isColumnUnneeded(null, "order_id"));
+        assertFalse(walEventConverter.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "order_id"));
+        assertTrue(walEventConverter.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "status"));
+    }
+    
+    @Test
+    public void assertWriteRowEventWithoutCustomColumns() throws ReflectiveOperationException {
+        assertWriteRowEvent0(null, 3);
+    }
+    
+    @Test
+    public void assertWriteRowEventWithCustomColumns() throws ReflectiveOperationException {
+        assertWriteRowEvent0(mockTargetTableColumnsMap(), 1);
+    }
+    
+    private void assertWriteRowEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+        dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
+        WriteRowEvent rowsEvent = new WriteRowEvent();
+        rowsEvent.setDatabaseName("");
+        rowsEvent.setTableName("t_order");
+        rowsEvent.setAfterRow(Arrays.asList(101, 1, "OK"));
+        Method method = WALEventConverter.class.getDeclaredMethod("handleWriteRowEvent", WriteRowEvent.class, PipelineTableMetaData.class);
+        DataRecord actual = (DataRecord) Plugins.getMemberAccessor().invoke(method, walEventConverter, rowsEvent, pipelineTableMetaData);
+        assertThat(actual.getType(), is(IngestDataChangeType.INSERT));
+        assertThat(actual.getColumnCount(), is(expectedColumnCount));
+    }
+    
+    private Map<LogicTableName, Set<ColumnName>> mockTargetTableColumnsMap() {
+        return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")));
+    }
+    
     @Test
     public void assertConvertBeginTXEvent() {
         BeginTXEvent beginTXEvent = new BeginTXEvent(100);
@@ -144,14 +209,16 @@ public final class WALEventConverterTest {
     
     @Test
     public void assertUnknownTable() {
-        Record record = walEventConverter.convert(mockUnknownTableEvent());
-        assertThat(record, instanceOf(PlaceholderRecord.class));
+        assertInstanceOf(PlaceholderRecord.class, walEventConverter.convert(mockUnknownTableEvent()));
     }
     
     @Test
     public void assertConvertFailure() {
-        assertThrows(UnsupportedSQLOperationException.class, () -> walEventConverter.convert(new AbstractRowEvent() {
-        }));
+        AbstractRowEvent event = new AbstractRowEvent() {
+        };
+        event.setDatabaseName("");
+        event.setTableName("t_order");
+        assertThrows(UnsupportedSQLOperationException.class, () -> walEventConverter.convert(event));
     }
     
     private AbstractRowEvent mockWriteRowEvent() {
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 99f10a02ca4..e0197d80ef0 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -31,10 +31,12 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -107,11 +109,14 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
         PipelineDataSourceWrapper sourceDataSource = dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
         PipelineDataSourceWrapper targetDataSource = dataSourceManager.getDataSource(jobConfig.getTarget());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dataNode.getSchemaName(), dataNode.getTableName());
+        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(), dataNode.getTableName()));
+        List<String> columnNames = tableMetaData.getColumnNames();
         List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtil.getUniqueKeyColumns(
                 sourceTable.getSchemaName().getOriginal(), sourceTable.getTableName().getOriginal(), metaDataLoader);
         PipelineColumnMetaData uniqueKey = uniqueKeyColumns.isEmpty() ? null : uniqueKeyColumns.get(0);
         SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
-                jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, uniqueKey, metaDataLoader, readRateLimitAlgorithm, progressContext);
+                jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKey, readRateLimitAlgorithm, progressContext);
         return singleTableInventoryChecker.check(calculateAlgorithm);
     }
     
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 2182776d753..9cf6cd9164d 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -158,7 +158,7 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
         PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
         ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
         SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
-                primaryKeyMetaData, metaDataLoader, null, progressContext);
+                tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
         DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
         assertTrue(checkResult.isMatched());
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
similarity index 81%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
index 17f312fdffd..d3df289832b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractColumnValueReader;
 
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -26,10 +26,7 @@ import java.sql.SQLException;
 /**
  * Basic column value reader.
  */
-@RequiredArgsConstructor
-public final class BasicColumnValueReader extends AbstractColumnValueReader {
-    
-    private final String databaseType;
+public final class FixtureColumnValueReader extends AbstractColumnValueReader {
     
     @Override
     protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
@@ -38,6 +35,6 @@ public final class BasicColumnValueReader extends AbstractColumnValueReader {
     
     @Override
     public String getType() {
-        return databaseType;
+        return "H2";
     }
 }
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 98f3ce69d59..080c2dbcc38 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -23,11 +23,6 @@ import java.util.Optional;
 
 public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
-    @Override
-    public String getType() {
-        return "H2";
-    }
-    
     @Override
     protected boolean isKeyword(final String item) {
         return false;
@@ -47,4 +42,9 @@ public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder
     public Optional<String> buildEstimatedCountSQL(final String schemaName, final String tableName) {
         return Optional.empty();
     }
+    
+    @Override
+    public String getType() {
+        return "H2";
+    }
 }
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
index 8c3072fdda9..2ada8ee55db 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -35,6 +36,46 @@ public final class PipelineSQLBuilderTest {
     
     private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder();
     
+    @Test
+    public void assertBuildDivisibleInventoryDumpSQL() {
+        String actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null, "t_order", Collections.singletonList("*"), "order_id");
+        assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC"));
+        actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC"));
+    }
+    
+    @Test
+    public void assertBuildDivisibleInventoryDumpSQLNoEnd() {
+        String actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null, "t_order", Collections.singletonList("*"), "order_id");
+        assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? ORDER BY order_id ASC"));
+        actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? ORDER BY order_id ASC"));
+    }
+    
+    @Test
+    public void assertBuildIndivisibleInventoryDumpSQL() {
+        String actual = pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null, "t_order", Collections.singletonList("*"), "order_id");
+        assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
+        actual = pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC"));
+    }
+    
+    @Test
+    public void assertBuildQueryAllOrderingSQLFirstQuery() {
+        String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Collections.singletonList("*"), "order_id", true);
+        assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
+        actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", true);
+        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC"));
+    }
+    
+    @Test
+    public void assertBuildQueryAllOrderingSQLNonFirstQuery() {
+        String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Collections.singletonList("*"), "order_id", false);
+        assertThat(actual, is("SELECT * FROM t_order WHERE order_id>? ORDER BY order_id ASC"));
+        actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", false);
+        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? ORDER BY order_id ASC"));
+    }
+    
     @Test
     public void assertBuildInsertSQL() {
         String actual = pipelineSQLBuilder.buildInsertSQL(null, mockDataRecord("t2"));
diff --git a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
new file mode 100644
index 00000000000..6d2c2645d60
--- /dev/null
+++ b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureColumnValueReader