You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/03/11 01:39:15 UTC
[shardingsphere] branch master updated: Support custom columns in pipeline job (#24546)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 59e45c23df7 Support custom columns in pipeline job (#24546)
59e45c23df7 is described below
commit 59e45c23df77268b3c8dec935b200d0758438286
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Mar 11 09:38:58 2023 +0800
Support custom columns in pipeline job (#24546)
---
.../DataConsistencyCalculateParameter.java | 4 +-
.../api/config/ingest/DumperConfiguration.java | 35 +++++++
.../ingest/InventoryDumperConfiguration.java | 1 +
.../data/pipeline/api/metadata/ColumnName.java} | 27 ++----
.../api/metadata/model/PipelineTableMetaData.java | 1 +
.../spi/sqlbuilder/PipelineSQLBuilder.java | 13 ++-
...SingleTableInventoryDataConsistencyChecker.java | 15 +--
...DataMatchDataConsistencyCalculateAlgorithm.java | 13 +--
.../pipeline/core/importer/DataSourceImporter.java | 3 -
.../core/ingest/dumper/InventoryDumper.java | 17 ++--
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 25 +++--
.../fixture/FixturePipelineSQLBuilder.java | 8 +-
.../mysql/ingest/MySQLIncrementalDumper.java | 28 ++++--
.../mysql/ingest/MySQLIncrementalDumperTest.java | 107 +++++++++++++++++----
.../postgresql/ingest/wal/WALEventConverter.java | 61 ++++++------
.../ingest/wal/WALEventConverterTest.java | 91 +++++++++++++++---
.../MigrationDataConsistencyChecker.java | 7 +-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 2 +-
.../core/fixture/FixtureColumnValueReader.java | 11 +--
.../core/fixture/FixturePipelineSQLBuilder.java | 10 +-
.../spi/sqlbuilder/PipelineSQLBuilderTest.java | 41 ++++++++
...ta.pipeline.spi.ingest.dumper.ColumnValueReader | 18 ++++
22 files changed, 389 insertions(+), 149 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 9fb97eb3f3a..cbfc9eeb87d 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -24,7 +24,7 @@ import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import java.util.Collection;
+import java.util.List;
/**
* Data consistency calculate parameter.
@@ -45,7 +45,7 @@ public final class DataConsistencyCalculateParameter {
private final String logicTableName;
- private final Collection<String> columnNames;
+ private final List<String> columnNames;
private final String databaseType;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 145828c2052..8cb49bfd012 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.api.config.ingest;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@@ -24,9 +25,14 @@ import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Dumper configuration.
@@ -50,6 +56,10 @@ public class DumperConfiguration {
private TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ // LinkedHashSet is required
+ @Getter(AccessLevel.PROTECTED)
+ private Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap;
+
private boolean decodeWithTX;
/**
@@ -95,4 +105,29 @@ public class DumperConfiguration {
public String getSchemaName(final ActualTableName actualTableName) {
return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
}
+
+ /**
+ * Get column name list of table.
+ *
+ * @param logicTableName logic table name
+ * @return column names of table
+ */
+ public Optional<List<String>> getColumnNameList(final LogicTableName logicTableName) {
+ Set<ColumnName> columnNames = null != targetTableColumnsMap ? targetTableColumnsMap.get(logicTableName) : null;
+ if (null == columnNames) {
+ return Optional.empty();
+ }
+ return Optional.of(columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList()));
+ }
+
+ /**
+ * Get column name set of table.
+ *
+ * @param actualTableName actual table name
+ * @return column names of table
+ */
+ public Optional<Set<ColumnName>> getColumnNameSet(final String actualTableName) {
+ Set<ColumnName> result = null != targetTableColumnsMap ? targetTableColumnsMap.get(getLogicTableName(actualTableName)) : null;
+ return Optional.ofNullable(result);
+ }
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index b52539ac4c8..68a44376295 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -51,6 +51,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
setTableNameSchemaNameMapping(dumperConfig.getTableNameSchemaNameMapping());
+ setTargetTableColumnsMap(dumperConfig.getTargetTableColumnsMap());
}
/**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/ColumnName.java
similarity index 54%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/ColumnName.java
index 17f312fdffd..4ff60573b6b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/ColumnName.java
@@ -15,29 +15,16 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
-
-import lombok.RequiredArgsConstructor;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
/**
- * Basic column value reader.
+ * Column name.
+ *
+ * <p>It's case-insensitive.</p>
*/
-@RequiredArgsConstructor
-public final class BasicColumnValueReader extends AbstractColumnValueReader {
-
- private final String databaseType;
-
- @Override
- protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
- return super.defaultDoReadValue(resultSet, metaData, columnIndex);
- }
+public final class ColumnName extends IdentifierName {
- @Override
- public String getType() {
- return databaseType;
+ public ColumnName(final String columnName) {
+ super(columnName);
}
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
index 7fd41b6c96e..1f8aee1d05e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
@@ -68,6 +68,7 @@ public final class PipelineTableMetaData {
* @param columnIndex the first column is 1, the second is 2, ...
* @return column meta data
*/
+ // TODO Remove it. Get column meta data by column name for incremental dumper, since columns ordering might be changed.
public PipelineColumnMetaData getColumnMetaData(final int columnIndex) {
return getColumnMetaData(columnNames.get(columnIndex - 1));
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index b7ebb6dad60..05f8fc10a58 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -45,30 +45,33 @@ public interface PipelineSQLBuilder extends TypedSPI {
*
* @param schemaName schema name
* @param tableName table name
+ * @param columnNames column names
* @param uniqueKey unique key
* @return divisible inventory dump SQL
*/
- String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey);
+ String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, List<String> columnNames, String uniqueKey);
/**
* Build divisible inventory dump SQL without end value.
*
* @param schemaName schema name
* @param tableName table name
+ * @param columnNames column names
* @param uniqueKey unique key
* @return divisible inventory dump SQL without end value
*/
- String buildDivisibleInventoryDumpSQLNoEnd(String schemaName, String tableName, String uniqueKey);
+ String buildDivisibleInventoryDumpSQLNoEnd(String schemaName, String tableName, List<String> columnNames, String uniqueKey);
/**
* Build indivisible inventory dump first SQL.
*
* @param schemaName schema name
* @param tableName table name
+ * @param columnNames column names
* @param uniqueKey unique key
* @return indivisible inventory dump SQL
*/
- String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey);
+ String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, List<String> columnNames, String uniqueKey);
/**
* Build no unique key inventory dump SQL.
@@ -104,6 +107,7 @@ public interface PipelineSQLBuilder extends TypedSPI {
* @param record data record
* @return filtered columns
*/
+ // TODO Consider remove extractUpdatedColumns. openGauss has special impl currently
List<Column> extractUpdatedColumns(DataRecord record);
/**
@@ -149,11 +153,12 @@ public interface PipelineSQLBuilder extends TypedSPI {
*
* @param schemaName schema name
* @param tableName table name
+ * @param columnNames column names
* @param uniqueKey unique key, it may be primary key, not null
* @param firstQuery first query
* @return query SQL
*/
- String buildQueryAllOrderingSQL(String schemaName, String tableName, String uniqueKey, boolean firstQuery);
+ String buildQueryAllOrderingSQL(String schemaName, String tableName, List<String> columnNames, String uniqueKey, boolean firstQuery);
/**
* Build check empty SQL.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 7f4461394ec..b278267f059 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -28,21 +28,17 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
@@ -69,9 +65,9 @@ public final class SingleTableInventoryDataConsistencyChecker {
private final SchemaTableName targetTable;
- private final PipelineColumnMetaData uniqueKey;
+ private final List<String> columnNames;
- private final PipelineTableMetaDataLoader metaDataLoader;
+ private final PipelineColumnMetaData uniqueKey;
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
@@ -99,9 +95,6 @@ public final class SingleTableInventoryDataConsistencyChecker {
String targetDatabaseType = targetDataSource.getDatabaseType().getType();
String schemaName = sourceTable.getSchemaName().getOriginal();
String sourceTableName = sourceTable.getTableName().getOriginal();
- PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, sourceTableName);
- ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(schemaName, sourceTableName));
- Collection<String> columnNames = tableMetaData.getColumnNames();
Map<String, Object> tableCheckPositions = progressContext.getTableCheckPositions();
DataConsistencyCalculateParameter sourceParam = buildParameter(
sourceDataSource, schemaName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey, tableCheckPositions.get(sourceTableName));
@@ -162,7 +155,7 @@ public final class SingleTableInventoryDataConsistencyChecker {
}
private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource,
- final String schemaName, final String tableName, final Collection<String> columnNames,
+ final String schemaName, final String tableName, final List<String> columnNames,
final String sourceDatabaseType, final String targetDatabaseType, final PipelineColumnMetaData uniqueKey,
final Object tableCheckPosition) {
return new DataConsistencyCalculateParameter(sourceDataSource, schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey, tableCheckPosition);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 14a6a4ad919..e926b0cadba 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.BasicColumnValueReader;
import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
@@ -93,8 +92,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
try {
Collection<Collection<Object>> records = new LinkedList<>();
Object maxUniqueKeyValue = null;
- ColumnValueReader columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, param.getDatabaseType())
- .orElseGet(() -> new BasicColumnValueReader(param.getDatabaseType()));
+ ColumnValueReader columnValueReader = PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, param.getDatabaseType());
ResultSet resultSet = calculationContext.getResultSet();
while (resultSet.next()) {
if (isCanceling()) {
@@ -178,13 +176,8 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
throw new UnsupportedOperationException("Data consistency of DATA_MATCH type not support table without unique key and primary key now");
}
PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
- String logicTableName = param.getLogicTableName();
- String schemaName = param.getSchemaName();
- String uniqueKey = param.getUniqueKey().getName();
- if (null != param.getTableCheckPosition()) {
- return sqlBuilder.buildQueryAllOrderingSQL(schemaName, logicTableName, uniqueKey, false);
- }
- return sqlBuilder.buildQueryAllOrderingSQL(schemaName, logicTableName, uniqueKey, true);
+ boolean firstQuery = null == param.getTableCheckPosition();
+ return sqlBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), param.getLogicTableName(), param.getColumnNames(), param.getUniqueKey().getName(), firstQuery);
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index db2da8610d2..a929b2d9ddd 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -206,9 +206,6 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
Set<String> shardingColumns = importerConfig.getShardingColumns(record.getTableName());
- if (null == shardingColumns) {
- log.error("executeUpdate, could not get shardingColumns, tableName={}, logicTableNames={}", record.getTableName(), importerConfig.getLogicTableNames());
- }
List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, shardingColumns);
List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record);
String updateSql = pipelineSqlBuilder.buildUpdateSQL(getSchemaName(record.getTableName()), record, conditionColumns);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 3d30e7876e4..a4e2025a497 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -58,6 +58,8 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
/**
* Inventory dumper.
@@ -88,7 +90,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
this.dataSource = dataSource;
String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, databaseType);
- columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, databaseType).orElseGet(() -> new BasicColumnValueReader(databaseType));
+ columnValueReader = PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, databaseType);
this.metaDataLoader = metaDataLoader;
}
@@ -141,21 +143,23 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
}
private String buildInventoryDumpSQL() {
- String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+ LogicTableName logicTableName = new LogicTableName(dumperConfig.getLogicTableName());
+ String schemaName = dumperConfig.getSchemaName(logicTableName);
if (!dumperConfig.hasUniqueKey()) {
return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName, dumperConfig.getActualTableName());
}
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
+ List<String> columnNames = dumperConfig.getColumnNameList(logicTableName).orElse(Collections.singletonList("*"));
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != position.getBeginValue() && null != position.getEndValue()) {
- return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
+ return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
}
if (null != position.getBeginValue() && null == position.getEndValue()) {
- return sqlBuilder.buildDivisibleInventoryDumpSQLNoEnd(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
+ return sqlBuilder.buildDivisibleInventoryDumpSQLNoEnd(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
}
}
- return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), firstColumn.getName());
+ return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
}
private void setParameters(final PreparedStatement preparedStatement) throws SQLException {
@@ -185,7 +189,8 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
result.setType(IngestDataChangeType.INSERT);
result.setTableName(dumperConfig.getLogicTableName());
for (int i = 1; i <= columnCount; i++) {
- result.addColumn(new Column(resultSetMetaData.getColumnName(i), columnValueReader.readValue(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(i).isUniqueKey()));
+ String columnName = resultSetMetaData.getColumnName(i);
+ result.addColumn(new Column(columnName, columnValueReader.readValue(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
}
return result;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index f6290fc4fc9..5ea009352c2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -71,24 +71,31 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
protected abstract String getRightIdentifierQuoteString();
@Override
- public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+ public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = quote(uniqueKey);
- return String.format("SELECT * FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey, quotedUniqueKey);
+ return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, quotedUniqueKey, quotedUniqueKey);
+ }
+
+ private String buildQueryColumns(final List<String> columnNames) {
+ if (columnNames.isEmpty()) {
+ return "*";
+ }
+ return String.join(",", columnNames);
}
@Override
- public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final String uniqueKey) {
+ public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = quote(uniqueKey);
- return String.format("SELECT * FROM %s WHERE %s>=? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
+ return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
}
@Override
- public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+ public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = quote(uniqueKey);
- return String.format("SELECT * FROM %s ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey);
+ return String.format("SELECT %s FROM %s ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
}
@Override
@@ -183,12 +190,12 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final String uniqueKey, final boolean firstQuery) {
+ public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = quote(uniqueKey);
return firstQuery
- ? String.format("SELECT * FROM %s ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey)
- : String.format("SELECT * FROM %s WHERE %s>? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
+ ? String.format("SELECT %s FROM %s ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey)
+ : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
}
@Override
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 523a43e6b96..27341e58971 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -29,17 +29,17 @@ import java.util.Optional;
public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
@Override
- public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+ public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
return "";
}
@Override
- public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final String uniqueKey) {
+ public String buildDivisibleInventoryDumpSQLNoEnd(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
return "";
}
@Override
- public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey) {
+ public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
return "";
}
@@ -79,7 +79,7 @@ public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
}
@Override
- public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final String uniqueKey, final boolean firstQuery) {
+ public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey, final boolean firstQuery) {
return "";
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 14e3dd709a4..d6a0e2d6462 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -57,6 +58,7 @@ import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
/**
* MySQL incremental dumper.
@@ -109,18 +111,16 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
createPlaceholderRecord(event);
return;
}
+ PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((AbstractRowsEvent) event).getTableName());
if (event instanceof WriteRowsEvent) {
- PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
return;
}
if (event instanceof UpdateRowsEvent) {
- PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
return;
}
if (event instanceof DeleteRowsEvent) {
- PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
}
}
@@ -136,18 +136,27 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
}
private void handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
+ Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
for (Serializable[] each : event.getAfterRows()) {
DataRecord record = createDataRecord(event, each.length);
record.setType(IngestDataChangeType.INSERT);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
- record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
+ if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+ continue;
+ }
+ record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
}
channel.pushRecord(record);
}
}
+ boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, final String columnName) {
+ return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName));
+ }
+
private void handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
+ Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Serializable[] afterValues = event.getAfterRows().get(i);
@@ -158,21 +167,28 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
Serializable newValue = afterValues[j];
boolean updated = !Objects.equals(newValue, oldValue);
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
+ if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+ continue;
+ }
record.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, oldValue),
- handleValue(columnMetaData, newValue), updated, columnMetaData.isPrimaryKey()));
+ handleValue(columnMetaData, newValue), updated, columnMetaData.isUniqueKey()));
}
channel.pushRecord(record);
}
}
private void handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
+ Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
for (Serializable[] each : event.getBeforeRows()) {
DataRecord record = createDataRecord(event, each.length);
record.setType(IngestDataChangeType.DELETE);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
- record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
+ if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+ continue;
+ }
+ record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
}
channel.pushRecord(record);
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index ae10cc7d8f3..5ff6d3ef420 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -45,25 +46,32 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
import org.mockito.internal.configuration.plugins.Plugins;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import javax.sql.DataSource;
+import java.io.Serializable;
+import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.when;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@@ -71,22 +79,23 @@ public final class MySQLIncrementalDumperTest {
private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
+ private DumperConfiguration dumperConfig;
+
private MySQLIncrementalDumper incrementalDumper;
private MultiplexMemoryPipelineChannel channel;
- @Mock
private PipelineTableMetaData pipelineTableMetaData;
@BeforeEach
public void setUp() {
- DumperConfiguration dumperConfig = mockDumperConfiguration();
+ dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
dumperConfig.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", "root", "root"));
channel = new MultiplexMemoryPipelineChannel(1, 10000, new EmptyAckCallback());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
- when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true, true));
+ pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList());
}
private DumperConfiguration mockDumperConfiguration() {
@@ -104,57 +113,113 @@ public final class MySQLIncrementalDumperTest {
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
- statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
- statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT, status VARCHAR(12))");
+ statement.execute("INSERT INTO t_order (order_id, user_id, status) VALUES (101, 1, 'OK'), (102, 1, 'OK')");
}
}
+ private static Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
+ return mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName, Function.identity()));
+ }
+
+ private static List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
+ List<PipelineColumnMetaData> result = new LinkedList<>();
+ result.add(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "INT", false, true, true));
+ result.add(new PipelineColumnMetaData(1, "user_id", Types.INTEGER, "INT", false, false, false));
+ result.add(new PipelineColumnMetaData(1, "status", Types.VARCHAR, "VARCHAR", false, false, false));
+ return result;
+ }
+
@AfterEach
public void tearDown() {
dataSourceManager.close();
}
@Test
- public void assertWriteRowsEvent() throws ReflectiveOperationException {
+ public void assertIsColumnUnneeded() {
+ assertFalse(incrementalDumper.isColumnUnneeded(null, "order_id"));
+ assertFalse(incrementalDumper.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "order_id"));
+ assertTrue(incrementalDumper.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "status"));
+ }
+
+ @Test
+ public void assertWriteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
+ assertWriteRowsEvent0(null, 3);
+ }
+
+ @Test
+ public void assertWriteRowsEventWithCustomColumns() throws ReflectiveOperationException {
+ assertWriteRowsEvent0(mockTargetTableColumnsMap(), 1);
+ }
+
+ private void assertWriteRowsEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+ dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
- rowsEvent.setAfterRows(Collections.singletonList(new String[]{"1", "order"}));
- Plugins.getMemberAccessor().invoke(
- MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class), incrementalDumper, rowsEvent, pipelineTableMetaData);
+ rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class);
+ Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
List<Record> actual = channel.fetchRecords(1, 0);
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.INSERT));
+ assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
+ }
+
+ private Map<LogicTableName, Set<ColumnName>> mockTargetTableColumnsMap() {
+ return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")));
}
@Test
- public void assertUpdateRowsEvent() throws ReflectiveOperationException {
+ public void assertUpdateRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
+ assertUpdateRowsEvent0(null, 3);
+ }
+
+ @Test
+ public void assertUpdateRowsEventWithCustomColumns() throws ReflectiveOperationException {
+ assertUpdateRowsEvent0(mockTargetTableColumnsMap(), 1);
+ }
+
+ private void assertUpdateRowsEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+ dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
- rowsEvent.setBeforeRows(Collections.singletonList(new String[]{"1", "order_old"}));
- rowsEvent.setAfterRows(Collections.singletonList(new String[]{"1", "order_new"}));
- Plugins.getMemberAccessor().invoke(
- MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class), incrementalDumper, rowsEvent, pipelineTableMetaData);
+ rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
+ Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class);
+ Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
List<Record> actual = channel.fetchRecords(1, 0);
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.UPDATE));
+ assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
+ }
+
+ @Test
+ public void assertDeleteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
+ assertDeleteRowsEvent0(null, 3);
}
@Test
- public void assertDeleteRowsEvent() throws ReflectiveOperationException {
+ public void assertDeleteRowsEventWithCustomColumns() throws ReflectiveOperationException {
+ assertDeleteRowsEvent0(mockTargetTableColumnsMap(), 1);
+ }
+
+ private void assertDeleteRowsEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+ dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
- rowsEvent.setBeforeRows(Collections.singletonList(new String[]{"1", "order"}));
- Plugins.getMemberAccessor().invoke(
- MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class), incrementalDumper, rowsEvent, pipelineTableMetaData);
+ rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class);
+ Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
List<Record> actual = channel.fetchRecords(1, 0);
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.DELETE));
+ assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
}
@Test
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index bbd59daa59a..ac5db47d1c0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -23,20 +23,20 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import java.util.List;
+import java.util.Set;
/**
* WAL event converter.
@@ -62,61 +62,59 @@ public final class WALEventConverter {
if (filter(event)) {
return createPlaceholderRecord(event);
}
+ if (!(event instanceof AbstractRowEvent)) {
+ return createPlaceholderRecord(event);
+ }
+ PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((AbstractRowEvent) event).getTableName());
if (event instanceof WriteRowEvent) {
- return handleWriteRowsEvent((WriteRowEvent) event);
+ return handleWriteRowEvent((WriteRowEvent) event, tableMetaData);
}
if (event instanceof UpdateRowEvent) {
- return handleUpdateRowsEvent((UpdateRowEvent) event);
+ return handleUpdateRowEvent((UpdateRowEvent) event, tableMetaData);
}
if (event instanceof DeleteRowEvent) {
- return handleDeleteRowsEvent((DeleteRowEvent) event);
- }
- if (event instanceof PlaceholderEvent || event instanceof BeginTXEvent || event instanceof CommitTXEvent) {
- return createPlaceholderRecord(event);
+ return handleDeleteRowEvent((DeleteRowEvent) event, tableMetaData);
}
throw new UnsupportedSQLOperationException("");
}
private boolean filter(final AbstractWALEvent event) {
- if (isRowEvent(event)) {
+ if (event instanceof AbstractRowEvent) {
AbstractRowEvent rowEvent = (AbstractRowEvent) event;
return !dumperConfig.containsTable(rowEvent.getTableName());
}
return false;
}
- private boolean isRowEvent(final AbstractWALEvent event) {
- return event instanceof WriteRowEvent || event instanceof UpdateRowEvent || event instanceof DeleteRowEvent;
- }
-
private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) {
return new PlaceholderRecord(new WALPosition(event.getLogSequenceNumber()));
}
- private DataRecord handleWriteRowsEvent(final WriteRowEvent writeRowEvent) {
+ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
+ return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
+ }
+
+ private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
DataRecord result = createDataRecord(writeRowEvent, writeRowEvent.getAfterRow().size());
result.setType(IngestDataChangeType.INSERT);
- putColumnsIntoDataRecord(result, getPipelineTableMetaData(writeRowEvent.getTableName()), writeRowEvent.getAfterRow());
+ putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getTableName(), writeRowEvent.getAfterRow());
return result;
}
- private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
- return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
- }
-
- private DataRecord handleUpdateRowsEvent(final UpdateRowEvent updateRowEvent) {
+ private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
DataRecord result = createDataRecord(updateRowEvent, updateRowEvent.getAfterRow().size());
result.setType(IngestDataChangeType.UPDATE);
- putColumnsIntoDataRecord(result, getPipelineTableMetaData(updateRowEvent.getTableName()), updateRowEvent.getAfterRow());
+ String actualTableName = updateRowEvent.getTableName();
+ putColumnsIntoDataRecord(result, tableMetaData, actualTableName, updateRowEvent.getAfterRow());
return result;
}
- private DataRecord handleDeleteRowsEvent(final DeleteRowEvent event) {
+ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final PipelineTableMetaData tableMetaData) {
// TODO completion columns
DataRecord result = createDataRecord(event, event.getPrimaryKeys().size());
result.setType(IngestDataChangeType.DELETE);
// TODO Unique key may be a column within unique index
- List<String> primaryKeyColumns = getPipelineTableMetaData(event.getTableName()).getPrimaryKeyColumns();
+ List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), true, true));
}
@@ -130,12 +128,21 @@ public final class WALEventConverter {
return result;
}
- private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
+ private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List<Object> values) {
+ Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(actualTableName).orElse(null);
for (int i = 0, count = values.size(); i < count; i++) {
- boolean isUniqueKey = tableMetaData.getColumnMetaData(i + 1).isUniqueKey();
+ PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
+ if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+ continue;
+ }
+ boolean isUniqueKey = columnMetaData.isUniqueKey();
Object uniqueKeyOldValue = isUniqueKey ? values.get(i) : null;
- Column column = new Column(tableMetaData.getColumnMetaData(i + 1).getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
+ Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
dataRecord.addColumn(column);
}
}
+
+ boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, final String columnName) {
+ return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName));
+ }
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index dfaefecbcb9..66e9f05caf8 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -26,7 +26,10 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -39,40 +42,57 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Place
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.internal.configuration.plugins.Plugins;
import org.postgresql.replication.LogSequenceNumber;
import javax.sql.DataSource;
+import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public final class WALEventConverterTest {
+ private DumperConfiguration dumperConfig;
+
private WALEventConverter walEventConverter;
private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
- @Before
+ private PipelineTableMetaData pipelineTableMetaData;
+
+ @BeforeEach
public void setUp() {
- DumperConfiguration dumperConfig = mockDumperConfiguration();
+ dumperConfig = mockDumperConfiguration();
walEventConverter = new WALEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
initTableData(dumperConfig);
+ pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList());
}
- @After
+ @AfterEach
public void tearDown() {
dataSourceManager.close();
}
@@ -92,11 +112,56 @@ public final class WALEventConverterTest {
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
- statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
- statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT, status VARCHAR(12))");
+ statement.execute("INSERT INTO t_order (order_id, user_id, status) VALUES (101, 1, 'OK'), (102, 1, 'OK')");
}
}
+ private static Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
+ return mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName, Function.identity()));
+ }
+
+ private static List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
+ List<PipelineColumnMetaData> result = new LinkedList<>();
+ result.add(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "INT", false, true, true));
+ result.add(new PipelineColumnMetaData(1, "user_id", Types.INTEGER, "INT", false, false, false));
+ result.add(new PipelineColumnMetaData(1, "status", Types.VARCHAR, "VARCHAR", false, false, false));
+ return result;
+ }
+
+ @Test
+ public void assertIsColumnUnneeded() {
+ assertFalse(walEventConverter.isColumnUnneeded(null, "order_id"));
+ assertFalse(walEventConverter.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "order_id"));
+ assertTrue(walEventConverter.isColumnUnneeded(Stream.of("order_id", "user_id").map(ColumnName::new).collect(Collectors.toSet()), "status"));
+ }
+
+ @Test
+ public void assertWriteRowEventWithoutCustomColumns() throws ReflectiveOperationException {
+ assertWriteRowEvent0(null, 3);
+ }
+
+ @Test
+ public void assertWriteRowEventWithCustomColumns() throws ReflectiveOperationException {
+ assertWriteRowEvent0(mockTargetTableColumnsMap(), 1);
+ }
+
+ private void assertWriteRowEvent0(final Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
+ dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
+ WriteRowEvent rowsEvent = new WriteRowEvent();
+ rowsEvent.setDatabaseName("");
+ rowsEvent.setTableName("t_order");
+ rowsEvent.setAfterRow(Arrays.asList(101, 1, "OK"));
+ Method method = WALEventConverter.class.getDeclaredMethod("handleWriteRowEvent", WriteRowEvent.class, PipelineTableMetaData.class);
+ DataRecord actual = (DataRecord) Plugins.getMemberAccessor().invoke(method, walEventConverter, rowsEvent, pipelineTableMetaData);
+ assertThat(actual.getType(), is(IngestDataChangeType.INSERT));
+ assertThat(actual.getColumnCount(), is(expectedColumnCount));
+ }
+
+ private Map<LogicTableName, Set<ColumnName>> mockTargetTableColumnsMap() {
+ return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")));
+ }
+
@Test
public void assertConvertBeginTXEvent() {
BeginTXEvent beginTXEvent = new BeginTXEvent(100);
@@ -144,14 +209,16 @@ public final class WALEventConverterTest {
@Test
public void assertUnknownTable() {
- Record record = walEventConverter.convert(mockUnknownTableEvent());
- assertThat(record, instanceOf(PlaceholderRecord.class));
+ assertInstanceOf(PlaceholderRecord.class, walEventConverter.convert(mockUnknownTableEvent()));
}
@Test
public void assertConvertFailure() {
- assertThrows(UnsupportedSQLOperationException.class, () -> walEventConverter.convert(new AbstractRowEvent() {
- }));
+ AbstractRowEvent event = new AbstractRowEvent() {
+ };
+ event.setDatabaseName("");
+ event.setTableName("t_order");
+ assertThrows(UnsupportedSQLOperationException.class, () -> walEventConverter.convert(event));
}
private AbstractRowEvent mockWriteRowEvent() {
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 99f10a02ca4..e0197d80ef0 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -31,10 +31,12 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -107,11 +109,14 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
PipelineDataSourceWrapper sourceDataSource = dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
PipelineDataSourceWrapper targetDataSource = dataSourceManager.getDataSource(jobConfig.getTarget());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dataNode.getSchemaName(), dataNode.getTableName());
+ ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(), dataNode.getTableName()));
+ List<String> columnNames = tableMetaData.getColumnNames();
List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtil.getUniqueKeyColumns(
sourceTable.getSchemaName().getOriginal(), sourceTable.getTableName().getOriginal(), metaDataLoader);
PipelineColumnMetaData uniqueKey = uniqueKeyColumns.isEmpty() ? null : uniqueKeyColumns.get(0);
SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
- jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, uniqueKey, metaDataLoader, readRateLimitAlgorithm, progressContext);
+ jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKey, readRateLimitAlgorithm, progressContext);
return singleTableInventoryChecker.check(calculateAlgorithm);
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 2182776d753..9cf6cd9164d 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -158,7 +158,7 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
- primaryKeyMetaData, metaDataLoader, null, progressContext);
+ tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
assertTrue(checkResult.isMatched());
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
similarity index 81%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
index 17f312fdffd..d3df289832b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractColumnValueReader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -26,10 +26,7 @@ import java.sql.SQLException;
/**
* Basic column value reader.
*/
-@RequiredArgsConstructor
-public final class BasicColumnValueReader extends AbstractColumnValueReader {
-
- private final String databaseType;
+public final class FixtureColumnValueReader extends AbstractColumnValueReader {
@Override
protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
@@ -38,6 +35,6 @@ public final class BasicColumnValueReader extends AbstractColumnValueReader {
@Override
public String getType() {
- return databaseType;
+ return "H2";
}
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 98f3ce69d59..080c2dbcc38 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -23,11 +23,6 @@ import java.util.Optional;
public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- @Override
- public String getType() {
- return "H2";
- }
-
@Override
protected boolean isKeyword(final String item) {
return false;
@@ -47,4 +42,9 @@ public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder
public Optional<String> buildEstimatedCountSQL(final String schemaName, final String tableName) {
return Optional.empty();
}
+
+ @Override
+ public String getType() {
+ return "H2";
+ }
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
index 8c3072fdda9..2ada8ee55db 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -35,6 +36,46 @@ public final class PipelineSQLBuilderTest {
private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder();
+ @Test
+ public void assertBuildDivisibleInventoryDumpSQL() {
+ String actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null, "t_order", Collections.singletonList("*"), "order_id");
+ assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC"));
+ actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+ assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC"));
+ }
+
+ @Test
+ public void assertBuildDivisibleInventoryDumpSQLNoEnd() {
+ String actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null, "t_order", Collections.singletonList("*"), "order_id");
+ assertThat(actual, is("SELECT * FROM t_order WHERE order_id>=? ORDER BY order_id ASC"));
+ actual = pipelineSQLBuilder.buildDivisibleInventoryDumpSQLNoEnd(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+ assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? ORDER BY order_id ASC"));
+ }
+
+ @Test
+ public void assertBuildIndivisibleInventoryDumpSQL() {
+ String actual = pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null, "t_order", Collections.singletonList("*"), "order_id");
+ assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
+ actual = pipelineSQLBuilder.buildIndivisibleInventoryDumpSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id");
+ assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC"));
+ }
+
+ @Test
+ public void assertBuildQueryAllOrderingSQLFirstQuery() {
+ String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Collections.singletonList("*"), "order_id", true);
+ assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
+ actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", true);
+ assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC"));
+ }
+
+ @Test
+ public void assertBuildQueryAllOrderingSQLNonFirstQuery() {
+ String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Collections.singletonList("*"), "order_id", false);
+ assertThat(actual, is("SELECT * FROM t_order WHERE order_id>? ORDER BY order_id ASC"));
+ actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", false);
+ assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? ORDER BY order_id ASC"));
+ }
+
@Test
public void assertBuildInsertSQL() {
String actual = pipelineSQLBuilder.buildInsertSQL(null, mockDataRecord("t2"));
diff --git a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
new file mode 100644
index 00000000000..6d2c2645d60
--- /dev/null
+++ b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureColumnValueReader