You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/05/01 02:47:47 UTC

[shardingsphere] branch master updated: Refactor InventoryDumperConfiguration and DumperConfiguration (#17236)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 43db2cf851b Refactor InventoryDumperConfiguration and DumperConfiguration (#17236)
43db2cf851b is described below

commit 43db2cf851b91bca7cc41103b985b81521f984fd
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun May 1 10:47:39 2022 +0800

    Refactor InventoryDumperConfiguration and DumperConfiguration (#17236)
---
 .../consistency/DataConsistencyCheckResult.java    |  2 +-
 .../api/config/ingest/DumperConfiguration.java     | 22 ++++++++++++++++++++++
 .../ingest/InventoryDumperConfiguration.java       |  5 ++++-
 .../ingest/dumper/AbstractInventoryDumper.java     |  7 ++++---
 .../data/pipeline/core/task/InventoryTask.java     |  2 +-
 .../rulealtered/prepare/InventoryTaskSplitter.java | 22 ++++++++++++----------
 .../mysql/ingest/MySQLIncrementalDumper.java       |  4 ++--
 .../pipeline/mysql/ingest/MySQLJdbcDumperTest.java |  3 ++-
 .../postgresql/ingest/wal/WalEventConverter.java   |  4 ++--
 .../ingest/PostgreSQLJdbcDumperTest.java           |  3 ++-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  3 ++-
 .../data/pipeline/core/task/InventoryTaskTest.java |  7 +++++--
 12 files changed, 59 insertions(+), 25 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 04452249d44..00bf9120381 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -26,7 +26,7 @@ import lombok.ToString;
  */
 @RequiredArgsConstructor
 @Getter
-@ToString(callSuper = true)
+@ToString
 public final class DataConsistencyCheckResult {
     
     private final DataConsistencyCountCheckResult countCheckResult;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 2d6525d9ae5..6874a13d2ed 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -31,6 +31,8 @@ import java.util.Map;
 @Getter
 @Setter
 @ToString(exclude = "dataSourceConfig")
+// TODO it should be final and not extends by sub-class
+// TODO fields final
 public class DumperConfiguration {
     
     private String dataSourceName;
@@ -43,4 +45,24 @@ public class DumperConfiguration {
      * Table name map. Key is actual table name, value is logic table name.
      */
     private Map<String, String> tableNameMap;
+    
+    /**
+     * Get logic table name.
+     *
+     * @param actualTableName actual table name
+     * @return logic table name
+     */
+    public String getLogicTableName(final String actualTableName) {
+        return tableNameMap.get(actualTableName);
+    }
+    
+    /**
+     * Whether contains table.
+     *
+     * @param actualTableName actual table name
+     * @return contains or not
+     */
+    public boolean containsTable(final String actualTableName) {
+        return tableNameMap.containsKey(actualTableName);
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 52716ccaacf..a05301a315c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -28,9 +28,12 @@ import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
 @Getter
 @Setter
 @ToString(callSuper = true)
+// TODO fields final
 public final class InventoryDumperConfiguration extends DumperConfiguration {
     
-    private String tableName;
+    private String actualTableName;
+    
+    private String logicTableName;
     
     private String primaryKey;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 61e0139119c..e665a5cd4f3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -84,7 +84,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
             
             @Override
             protected PipelineTableMetaData initialize() {
-                return metaDataLoader.getTableMetaData(inventoryDumperConfig.getTableName());
+                return metaDataLoader.getTableMetaData(inventoryDumperConfig.getActualTableName());
             }
         };
     }
@@ -120,7 +120,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     }
     
     private String getDumpSQL() {
-        String tableName = inventoryDumperConfig.getTableName();
+        String tableName = inventoryDumperConfig.getActualTableName();
         String primaryKey = inventoryDumperConfig.getPrimaryKey();
         return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ? AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?";
     }
@@ -143,10 +143,11 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
                 ResultSetMetaData metaData = resultSet.getMetaData();
                 int rowCount = 0;
                 Number maxUniqueKeyValue = null;
+                String logicTableName = inventoryDumperConfig.getLogicTableName();
                 while (resultSet.next()) {
                     DataRecord record = new DataRecord(newPosition(resultSet), metaData.getColumnCount());
                     record.setType(IngestDataChangeType.INSERT);
-                    record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
+                    record.setTableName(logicTableName);
                     for (int i = 1; i <= metaData.getColumnCount(); i++) {
                         boolean isPrimaryKey = tableMetaData.isPrimaryKey(i - 1);
                         Object value = readValue(resultSet, i);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index b4d814b8334..85dd9222f49 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -77,7 +77,7 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     }
     
     private String generateTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
-        String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getTableName());
+        String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
         return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index cc8c76f4707..9ed950d09dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -94,7 +94,8 @@ public final class InventoryTaskSplitter {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         dumperConfig.getTableNameMap().forEach((key, value) -> {
             InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
-            inventoryDumperConfig.setTableName(key);
+            inventoryDumperConfig.setActualTableName(key);
+            inventoryDumperConfig.setLogicTableName(value);
             inventoryDumperConfig.setPosition(new PlaceholderPosition());
             result.add(inventoryDumperConfig);
         });
@@ -117,7 +118,8 @@ public final class InventoryTaskSplitter {
             InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
             splitDumperConfig.setPosition(inventoryPosition);
             splitDumperConfig.setShardingItem(i++);
-            splitDumperConfig.setTableName(dumperConfig.getTableName());
+            splitDumperConfig.setActualTableName(dumperConfig.getActualTableName());
+            splitDumperConfig.setLogicTableName(dumperConfig.getLogicTableName());
             splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
             splitDumperConfig.setBatchSize(batchSize);
             splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
@@ -130,10 +132,10 @@ public final class InventoryTaskSplitter {
                                                                 final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
         JobProgress initProgress = jobContext.getInitProgress();
         if (null != initProgress && initProgress.getStatus() != JobStatus.PREPARING_FAILURE) {
-            Collection<IngestPosition<?>> result = initProgress.getInventoryPosition(dumperConfig.getTableName()).values();
+            Collection<IngestPosition<?>> result = initProgress.getInventoryPosition(dumperConfig.getActualTableName()).values();
             for (IngestPosition<?> each : result) {
                 if (each instanceof PrimaryKeyPosition) {
-                    String primaryKey = metaDataLoader.getTableMetaData(dumperConfig.getTableName()).getPrimaryKeyColumns().get(0);
+                    String primaryKey = metaDataLoader.getTableMetaData(dumperConfig.getActualTableName()).getPrimaryKeyColumns().get(0);
                     dumperConfig.setPrimaryKey(primaryKey);
                     break;
                 }
@@ -141,13 +143,13 @@ public final class InventoryTaskSplitter {
             // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
             return result;
         }
-        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getTableName());
-        if (isSpiltByPrimaryKeyRange(tableMetaData, dumperConfig.getTableName())) {
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getActualTableName());
+        if (isSpiltByPrimaryKeyRange(tableMetaData, dumperConfig.getActualTableName())) {
             String primaryKey = tableMetaData.getPrimaryKeyColumns().get(0);
             dumperConfig.setPrimaryKey(primaryKey);
             return getPositionByPrimaryKeyRange(jobContext, dataSource, dumperConfig);
         }
-        throw new PipelineJobCreationException("Can not split by primary key range for table " + dumperConfig.getTableName());
+        throw new PipelineJobCreationException("Can not split by primary key range for table " + dumperConfig.getActualTableName());
     }
     
     private boolean isSpiltByPrimaryKeyRange(final PipelineTableMetaData tableMetaData, final String tableName) {
@@ -179,7 +181,7 @@ public final class InventoryTaskSplitter {
         Collection<IngestPosition<?>> result = new ArrayList<>();
         RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
         String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getSourceDatabaseType())
-                .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
+                .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey());
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement ps = connection.prepareStatement(sql)) {
@@ -194,7 +196,7 @@ public final class InventoryTaskSplitter {
                     }
                     long endId = rs.getLong(1);
                     if (endId == 0) {
-                        log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getTableName(), dumperConfig.getPrimaryKey(), beginId);
+                        log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), beginId);
                         break;
                     }
                     result.add(new PrimaryKeyPosition(beginId, endId));
@@ -206,7 +208,7 @@ public final class InventoryTaskSplitter {
                 result.add(new PrimaryKeyPosition(0, 0));
             }
         } catch (final SQLException ex) {
-            throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getTableName(), dumperConfig.getPrimaryKey()), ex);
+            throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey()), ex);
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 89077baaa52..f58bc94d24e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -120,7 +120,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     }
     
     private boolean filter(final String database, final AbstractRowsEvent event) {
-        return !event.getDatabaseName().equals(database) || !dumperConfig.getTableNameMap().containsKey(event.getTableName());
+        return !event.getDatabaseName().equals(database) || !dumperConfig.containsTable(event.getTableName());
     }
     
     private void handleWriteRowsEvent(final WriteRowsEvent event) {
@@ -176,7 +176,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     
     private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
         DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
-        result.setTableName(dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
+        result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()));
         result.setCommitTime(rowsEvent.getTimestamp() * 1000);
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
index 9c894e04c9c..32c2130dba1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
@@ -61,7 +61,8 @@ public final class MySQLJdbcDumperTest {
     private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
-        result.setTableName("t_order");
+        result.setActualTableName("t_order_0");
+        result.setLogicTableName("t_order");
         return result;
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index 44be238bb0e..0aeb40ce057 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -72,7 +72,7 @@ public final class WalEventConverter {
     private boolean filter(final AbstractWalEvent event) {
         if (isRowEvent(event)) {
             AbstractRowEvent rowEvent = (AbstractRowEvent) event;
-            return !dumperConfig.getTableNameMap().containsKey(rowEvent.getTableName());
+            return !dumperConfig.containsTable(rowEvent.getTableName());
         }
         return false;
     }
@@ -114,7 +114,7 @@ public final class WalEventConverter {
     
     private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
         DataRecord result = new DataRecord(new WalPosition(rowsEvent.getLogSequenceNumber()), columnCount);
-        result.setTableName(dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
+        result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()));
         return result;
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index 7d0dc137ac2..ea3f6b9be84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -56,7 +56,8 @@ public final class PostgreSQLJdbcDumperTest {
     private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
-        result.setTableName("t_order");
+        result.setActualTableName("t_order");
+        result.setLogicTableName("t_order");
         return result;
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f89fac3cf89..9897a6ba739 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -149,7 +149,8 @@ public final class GovernanceRepositoryAPIImplTest {
     private InventoryTask mockInventoryTask(final TaskConfiguration taskConfig) {
         InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
         dumperConfig.setPosition(new PlaceholderPosition());
-        dumperConfig.setTableName("t_order");
+        dumperConfig.setActualTableName("t_order");
+        dumperConfig.setLogicTableName("t_order");
         dumperConfig.setPrimaryKey("order_id");
         dumperConfig.setShardingItem(0);
         PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index c40de0ea22b..49f85712ac7 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -60,7 +60,8 @@ public final class InventoryTaskTest {
     @Test(expected = IngestException.class)
     public void assertStartWithGetEstimatedRowsFailure() {
         InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
-        inventoryDumperConfig.setTableName("t_non_exist");
+        inventoryDumperConfig.setActualTableName("t_non_exist");
+        inventoryDumperConfig.setLogicTableName("t_non_exist");
         IngestPosition<?> position = taskConfig.getDumperConfig().getPosition();
         if (null == position) {
             position = new PrimaryKeyPosition(0, 1000);
@@ -80,7 +81,9 @@ public final class InventoryTaskTest {
     public void assertGetProgress() throws SQLException {
         initTableData(taskConfig.getDumperConfig());
         InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
-        inventoryDumperConfig.setTableName("t_order");
+        // TODO use t_order_0, and also others
+        inventoryDumperConfig.setActualTableName("t_order");
+        inventoryDumperConfig.setLogicTableName("t_order");
         IngestPosition<?> position = taskConfig.getDumperConfig().getPosition();
         if (null == position) {
             position = new PrimaryKeyPosition(0, 1000);