You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/05/01 02:47:47 UTC
[shardingsphere] branch master updated: Refactor InventoryDumperConfiguration and DumperConfiguration (#17236)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 43db2cf851b Refactor InventoryDumperConfiguration and DumperConfiguration (#17236)
43db2cf851b is described below
commit 43db2cf851b91bca7cc41103b985b81521f984fd
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun May 1 10:47:39 2022 +0800
Refactor InventoryDumperConfiguration and DumperConfiguration (#17236)
---
.../consistency/DataConsistencyCheckResult.java | 2 +-
.../api/config/ingest/DumperConfiguration.java | 22 ++++++++++++++++++++++
.../ingest/InventoryDumperConfiguration.java | 5 ++++-
.../ingest/dumper/AbstractInventoryDumper.java | 7 ++++---
.../data/pipeline/core/task/InventoryTask.java | 2 +-
.../rulealtered/prepare/InventoryTaskSplitter.java | 22 ++++++++++++----------
.../mysql/ingest/MySQLIncrementalDumper.java | 4 ++--
.../pipeline/mysql/ingest/MySQLJdbcDumperTest.java | 3 ++-
.../postgresql/ingest/wal/WalEventConverter.java | 4 ++--
.../ingest/PostgreSQLJdbcDumperTest.java | 3 ++-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 3 ++-
.../data/pipeline/core/task/InventoryTaskTest.java | 7 +++++--
12 files changed, 59 insertions(+), 25 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 04452249d44..00bf9120381 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -26,7 +26,7 @@ import lombok.ToString;
*/
@RequiredArgsConstructor
@Getter
-@ToString(callSuper = true)
+@ToString
public final class DataConsistencyCheckResult {
private final DataConsistencyCountCheckResult countCheckResult;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 2d6525d9ae5..6874a13d2ed 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -31,6 +31,8 @@ import java.util.Map;
@Getter
@Setter
@ToString(exclude = "dataSourceConfig")
+// TODO it should be final and not extends by sub-class
+// TODO fields final
public class DumperConfiguration {
private String dataSourceName;
@@ -43,4 +45,24 @@ public class DumperConfiguration {
* Table name map. Key is actual table name, value is logic table name.
*/
private Map<String, String> tableNameMap;
+
+ /**
+ * Get logic table name.
+ *
+ * @param actualTableName actual table name
+ * @return logic table name
+ */
+ public String getLogicTableName(final String actualTableName) {
+ return tableNameMap.get(actualTableName);
+ }
+
+ /**
+ * Whether contains table.
+ *
+ * @param actualTableName actual table name
+ * @return contains or not
+ */
+ public boolean containsTable(final String actualTableName) {
+ return tableNameMap.containsKey(actualTableName);
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 52716ccaacf..a05301a315c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -28,9 +28,12 @@ import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
@Getter
@Setter
@ToString(callSuper = true)
+// TODO fields final
public final class InventoryDumperConfiguration extends DumperConfiguration {
- private String tableName;
+ private String actualTableName;
+
+ private String logicTableName;
private String primaryKey;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 61e0139119c..e665a5cd4f3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -84,7 +84,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
@Override
protected PipelineTableMetaData initialize() {
- return metaDataLoader.getTableMetaData(inventoryDumperConfig.getTableName());
+ return metaDataLoader.getTableMetaData(inventoryDumperConfig.getActualTableName());
}
};
}
@@ -120,7 +120,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
}
private String getDumpSQL() {
- String tableName = inventoryDumperConfig.getTableName();
+ String tableName = inventoryDumperConfig.getActualTableName();
String primaryKey = inventoryDumperConfig.getPrimaryKey();
return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ? AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?";
}
@@ -143,10 +143,11 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
ResultSetMetaData metaData = resultSet.getMetaData();
int rowCount = 0;
Number maxUniqueKeyValue = null;
+ String logicTableName = inventoryDumperConfig.getLogicTableName();
while (resultSet.next()) {
DataRecord record = new DataRecord(newPosition(resultSet), metaData.getColumnCount());
record.setType(IngestDataChangeType.INSERT);
- record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
+ record.setTableName(logicTableName);
for (int i = 1; i <= metaData.getColumnCount(); i++) {
boolean isPrimaryKey = tableMetaData.isPrimaryKey(i - 1);
Object value = readValue(resultSet, i);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index b4d814b8334..85dd9222f49 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -77,7 +77,7 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
}
private String generateTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
- String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getTableName());
+ String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index cc8c76f4707..9ed950d09dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -94,7 +94,8 @@ public final class InventoryTaskSplitter {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
dumperConfig.getTableNameMap().forEach((key, value) -> {
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
- inventoryDumperConfig.setTableName(key);
+ inventoryDumperConfig.setActualTableName(key);
+ inventoryDumperConfig.setLogicTableName(value);
inventoryDumperConfig.setPosition(new PlaceholderPosition());
result.add(inventoryDumperConfig);
});
@@ -117,7 +118,8 @@ public final class InventoryTaskSplitter {
InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
splitDumperConfig.setPosition(inventoryPosition);
splitDumperConfig.setShardingItem(i++);
- splitDumperConfig.setTableName(dumperConfig.getTableName());
+ splitDumperConfig.setActualTableName(dumperConfig.getActualTableName());
+ splitDumperConfig.setLogicTableName(dumperConfig.getLogicTableName());
splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
splitDumperConfig.setBatchSize(batchSize);
splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
@@ -130,10 +132,10 @@ public final class InventoryTaskSplitter {
final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
JobProgress initProgress = jobContext.getInitProgress();
if (null != initProgress && initProgress.getStatus() != JobStatus.PREPARING_FAILURE) {
- Collection<IngestPosition<?>> result = initProgress.getInventoryPosition(dumperConfig.getTableName()).values();
+ Collection<IngestPosition<?>> result = initProgress.getInventoryPosition(dumperConfig.getActualTableName()).values();
for (IngestPosition<?> each : result) {
if (each instanceof PrimaryKeyPosition) {
- String primaryKey = metaDataLoader.getTableMetaData(dumperConfig.getTableName()).getPrimaryKeyColumns().get(0);
+ String primaryKey = metaDataLoader.getTableMetaData(dumperConfig.getActualTableName()).getPrimaryKeyColumns().get(0);
dumperConfig.setPrimaryKey(primaryKey);
break;
}
@@ -141,13 +143,13 @@ public final class InventoryTaskSplitter {
// Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
return result;
}
- PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getTableName());
- if (isSpiltByPrimaryKeyRange(tableMetaData, dumperConfig.getTableName())) {
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getActualTableName());
+ if (isSpiltByPrimaryKeyRange(tableMetaData, dumperConfig.getActualTableName())) {
String primaryKey = tableMetaData.getPrimaryKeyColumns().get(0);
dumperConfig.setPrimaryKey(primaryKey);
return getPositionByPrimaryKeyRange(jobContext, dataSource, dumperConfig);
}
- throw new PipelineJobCreationException("Can not split by primary key range for table " + dumperConfig.getTableName());
+ throw new PipelineJobCreationException("Can not split by primary key range for table " + dumperConfig.getActualTableName());
}
private boolean isSpiltByPrimaryKeyRange(final PipelineTableMetaData tableMetaData, final String tableName) {
@@ -179,7 +181,7 @@ public final class InventoryTaskSplitter {
Collection<IngestPosition<?>> result = new ArrayList<>();
RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getSourceDatabaseType())
- .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
+ .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey());
try (
Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
@@ -194,7 +196,7 @@ public final class InventoryTaskSplitter {
}
long endId = rs.getLong(1);
if (endId == 0) {
- log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getTableName(), dumperConfig.getPrimaryKey(), beginId);
+ log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), beginId);
break;
}
result.add(new PrimaryKeyPosition(beginId, endId));
@@ -206,7 +208,7 @@ public final class InventoryTaskSplitter {
result.add(new PrimaryKeyPosition(0, 0));
}
} catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getTableName(), dumperConfig.getPrimaryKey()), ex);
+ throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey()), ex);
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 89077baaa52..f58bc94d24e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -120,7 +120,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
}
private boolean filter(final String database, final AbstractRowsEvent event) {
- return !event.getDatabaseName().equals(database) || !dumperConfig.getTableNameMap().containsKey(event.getTableName());
+ return !event.getDatabaseName().equals(database) || !dumperConfig.containsTable(event.getTableName());
}
private void handleWriteRowsEvent(final WriteRowsEvent event) {
@@ -176,7 +176,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
- result.setTableName(dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
+ result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()));
result.setCommitTime(rowsEvent.getTimestamp() * 1000);
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
index 9c894e04c9c..32c2130dba1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
@@ -61,7 +61,8 @@ public final class MySQLJdbcDumperTest {
private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
- result.setTableName("t_order");
+ result.setActualTableName("t_order_0");
+ result.setLogicTableName("t_order");
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index 44be238bb0e..0aeb40ce057 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -72,7 +72,7 @@ public final class WalEventConverter {
private boolean filter(final AbstractWalEvent event) {
if (isRowEvent(event)) {
AbstractRowEvent rowEvent = (AbstractRowEvent) event;
- return !dumperConfig.getTableNameMap().containsKey(rowEvent.getTableName());
+ return !dumperConfig.containsTable(rowEvent.getTableName());
}
return false;
}
@@ -114,7 +114,7 @@ public final class WalEventConverter {
private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new WalPosition(rowsEvent.getLogSequenceNumber()), columnCount);
- result.setTableName(dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
+ result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()));
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index 7d0dc137ac2..ea3f6b9be84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -56,7 +56,8 @@ public final class PostgreSQLJdbcDumperTest {
private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
DumperConfiguration dumperConfig = mockDumperConfiguration();
InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
- result.setTableName("t_order");
+ result.setActualTableName("t_order");
+ result.setLogicTableName("t_order");
return result;
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f89fac3cf89..9897a6ba739 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -149,7 +149,8 @@ public final class GovernanceRepositoryAPIImplTest {
private InventoryTask mockInventoryTask(final TaskConfiguration taskConfig) {
InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
dumperConfig.setPosition(new PlaceholderPosition());
- dumperConfig.setTableName("t_order");
+ dumperConfig.setActualTableName("t_order");
+ dumperConfig.setLogicTableName("t_order");
dumperConfig.setPrimaryKey("order_id");
dumperConfig.setShardingItem(0);
PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index c40de0ea22b..49f85712ac7 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -60,7 +60,8 @@ public final class InventoryTaskTest {
@Test(expected = IngestException.class)
public void assertStartWithGetEstimatedRowsFailure() {
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
- inventoryDumperConfig.setTableName("t_non_exist");
+ inventoryDumperConfig.setActualTableName("t_non_exist");
+ inventoryDumperConfig.setLogicTableName("t_non_exist");
IngestPosition<?> position = taskConfig.getDumperConfig().getPosition();
if (null == position) {
position = new PrimaryKeyPosition(0, 1000);
@@ -80,7 +81,9 @@ public final class InventoryTaskTest {
public void assertGetProgress() throws SQLException {
initTableData(taskConfig.getDumperConfig());
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
- inventoryDumperConfig.setTableName("t_order");
+ // TODO use t_order_0, and also others
+ inventoryDumperConfig.setActualTableName("t_order");
+ inventoryDumperConfig.setLogicTableName("t_order");
IngestPosition<?> position = taskConfig.getDumperConfig().getPosition();
if (null == position) {
position = new PrimaryKeyPosition(0, 1000);