You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/19 07:52:48 UTC
[shardingsphere] branch master updated: CDC E2E add single table test. (#26429)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aa01adf614d CDC E2E add single table test. (#26429)
aa01adf614d is described below
commit aa01adf614d9684a71fc40d691792c28071ebe94
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Jun 19 15:52:42 2023 +0800
CDC E2E add single table test. (#26429)
* CDC E2E add single table test.
* Improve data consistency check
---
.../pipeline/cdc/handler/CDCBackendHandler.java | 10 +++++--
...SingleTableInventoryDataConsistencyChecker.java | 16 +++++------
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 33 +++++++++++++---------
3 files changed, 35 insertions(+), 24 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index d13987dd3c0..700270b84c9 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -43,7 +43,6 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataR
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
@@ -51,6 +50,7 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
import org.apache.shardingsphere.single.rule.SingleRule;
import java.util.ArrayList;
@@ -122,7 +122,6 @@ public final class CDCBackendHandler {
private Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphereDatabase database, final Collection<String> tableNames) {
Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
- ShardingSpherePreconditions.checkState(shardingRule.isPresent() || singleRule.isPresent(), () -> new NoAnyRuleExistsException(database.getName()));
Map<String, List<DataNode>> result = new HashMap<>();
// TODO support virtual data source name
for (String each : tableNames) {
@@ -130,7 +129,12 @@ public final class CDCBackendHandler {
result.put(each, new ArrayList<>(singleRule.get().getAllDataNodes().get(each)));
continue;
}
- shardingRule.flatMap(value -> value.findTableRule(each)).ifPresent(rule -> result.put(each, rule.getActualDataNodes()));
+ if (shardingRule.isPresent() && shardingRule.get().findTableRule(each).isPresent()) {
+ TableRule tableRule = shardingRule.get().getTableRule(each);
+ result.put(each, tableRule.getActualDataNodes());
+ continue;
+ }
+ throw new PipelineInvalidParameterException(String.format("Not find actual data nodes of `%s`", each));
}
return result;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index a126d1d942d..37f6ee9f118 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -116,26 +116,26 @@ public final class SingleTableInventoryDataConsistencyChecker {
long sourceRecordsCount = 0;
long targetRecordsCount = 0;
boolean contentMatched = true;
- while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
+ while (sourceCalculatedResults.hasNext() || targetCalculatedResults.hasNext()) {
if (null != readRateLimitAlgorithm) {
readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
- DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
- DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
- sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
- targetRecordsCount += targetCalculatedResult.getRecordsCount();
+ DataConsistencyCalculatedResult sourceCalculatedResult = sourceCalculatedResults.hasNext() ? waitFuture(executor.submit(sourceCalculatedResults::next)) : null;
+ DataConsistencyCalculatedResult targetCalculatedResult = targetCalculatedResults.hasNext() ? waitFuture(executor.submit(targetCalculatedResults::next)) : null;
+ sourceRecordsCount += null == sourceCalculatedResult ? 0 : sourceCalculatedResult.getRecordsCount();
+ targetRecordsCount += null == targetCalculatedResult ? 0 : targetCalculatedResult.getRecordsCount();
contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
if (!contentMatched) {
log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
break;
}
- if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+ if (null != sourceCalculatedResult && sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
progressContext.getTableCheckPositions().put(sourceTable.getTableName().getOriginal(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
- if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+ if (null != targetCalculatedResult && targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
progressContext.getTableCheckPositions().put(targetTable.getTableName().getOriginal(), targetCalculatedResult.getMaxUniqueKeyValue().get());
}
- progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(null == sourceCalculatedResult ? 0 : sourceCalculatedResult.getRecordsCount()));
}
return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 6f1adabbdee..38e5864c551 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -63,7 +63,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@@ -114,6 +113,7 @@ class CDCE2EIT {
Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
log.info("init data begin: {}", LocalDateTime.now());
DataSourceExecuteUtils.execute(jdbcDataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), dataPair.getLeft());
+ DataSourceExecuteUtils.execute(containerComposer.getProxyDataSource(), "INSERT INTO t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"}));
log.info("init data end: {}", LocalDateTime.now());
try (
Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
@@ -134,20 +134,15 @@ class CDCE2EIT {
}
Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS)
.until(() -> listOrderRecords(containerComposer, getOrderTableNameWithSchema(containerComposer)).size() == actualProxyList.size());
- SchemaTableName schemaTableName = containerComposer.getDatabaseType().isSchemaAvailable()
+ SchemaTableName orderSchemaTableName = containerComposer.getDatabaseType().isSchemaAvailable()
? new SchemaTableName(new SchemaName(PipelineContainerComposer.SCHEMA_NAME), new TableName(SOURCE_TABLE_NAME))
: new SchemaTableName(new SchemaName(null), new TableName(SOURCE_TABLE_NAME));
+ PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4),
containerComposer.getDatabaseType());
- PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
- StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
- PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME, "t_order");
- PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
- ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
- SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
- tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
- DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
- assertTrue(checkResult.isMatched());
+ assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName);
+ assertDataMatched(new PipelineDataSourceWrapper(containerComposer.getProxyDataSource(), containerComposer.getDatabaseType()), targetDataSource,
+ new SchemaTableName(new SchemaName(null), new TableName("t_address")));
containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0);
assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
}
@@ -160,8 +155,9 @@ class CDCE2EIT {
private void initSchemaAndTable(final PipelineContainerComposer containerComposer, final Connection connection, final int sleepSeconds) throws SQLException {
containerComposer.createSchema(connection, sleepSeconds);
String sql = containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
- log.info("create table sql: {}", sql);
+ log.info("Create table sql: {}", sql);
connection.createStatement().execute(sql);
+ connection.createStatement().execute("CREATE TABLE t_address(id integer primary key, address_name varchar(255))");
if (sleepSeconds > 0) {
Awaitility.await().pollDelay(sleepSeconds, TimeUnit.SECONDS).until(() -> true);
}
@@ -183,7 +179,7 @@ class CDCE2EIT {
// TODO add full=false test case later
parameter.setFull(true);
String schema = containerComposer.getDatabaseType().isSchemaAvailable() ? "test" : "";
- parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build()));
+ parameter.setSchemaTables(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(), SchemaTable.newBuilder().setTable("t_address").build()));
DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
CompletableFuture.runAsync(() -> new CDCClient(parameter, recordConsumer).start(), executor).whenComplete((unused, throwable) -> {
if (null != throwable) {
@@ -205,6 +201,17 @@ class CDCE2EIT {
return containerComposer.getDatabaseType().isSchemaAvailable() ? String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
}
+ private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final SchemaTableName schemaTableName) {
+ StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(), schemaTableName.getTableName().getOriginal());
+ PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+ ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
+ SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
+ tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
+ DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
+ assertTrue(checkResult.isMatched());
+ }
+
private static boolean isEnabled() {
return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new OpenGaussDatabaseType());
}