You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/19 07:52:48 UTC

[shardingsphere] branch master updated: CDC E2E add single table test. (#26429)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new aa01adf614d CDC E2E add single table test. (#26429)
aa01adf614d is described below

commit aa01adf614d9684a71fc40d691792c28071ebe94
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Jun 19 15:52:42 2023 +0800

    CDC E2E add single table test. (#26429)
    
    * CDC E2E add single table test.
    
    * Improve data consistency check
---
 .../pipeline/cdc/handler/CDCBackendHandler.java    | 10 +++++--
 ...SingleTableInventoryDataConsistencyChecker.java | 16 +++++------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 33 +++++++++++++---------
 3 files changed, 35 insertions(+), 24 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index d13987dd3c0..700270b84c9 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -43,7 +43,6 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataR
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
@@ -51,6 +50,7 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
 import org.apache.shardingsphere.single.rule.SingleRule;
 
 import java.util.ArrayList;
@@ -122,7 +122,6 @@ public final class CDCBackendHandler {
     private Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphereDatabase database, final Collection<String> tableNames) {
         Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
         Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
-        ShardingSpherePreconditions.checkState(shardingRule.isPresent() || singleRule.isPresent(), () -> new NoAnyRuleExistsException(database.getName()));
         Map<String, List<DataNode>> result = new HashMap<>();
         // TODO support virtual data source name
         for (String each : tableNames) {
@@ -130,7 +129,12 @@ public final class CDCBackendHandler {
                 result.put(each, new ArrayList<>(singleRule.get().getAllDataNodes().get(each)));
                 continue;
             }
-            shardingRule.flatMap(value -> value.findTableRule(each)).ifPresent(rule -> result.put(each, rule.getActualDataNodes()));
+            if (shardingRule.isPresent() && shardingRule.get().findTableRule(each).isPresent()) {
+                TableRule tableRule = shardingRule.get().getTableRule(each);
+                result.put(each, tableRule.getActualDataNodes());
+                continue;
+            }
+            throw new PipelineInvalidParameterException(String.format("Not find actual data nodes of `%s`", each));
         }
         return result;
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index a126d1d942d..37f6ee9f118 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -116,26 +116,26 @@ public final class SingleTableInventoryDataConsistencyChecker {
         long sourceRecordsCount = 0;
         long targetRecordsCount = 0;
         boolean contentMatched = true;
-        while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
+        while (sourceCalculatedResults.hasNext() || targetCalculatedResults.hasNext()) {
             if (null != readRateLimitAlgorithm) {
                 readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
             }
-            DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
-            DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
-            sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
-            targetRecordsCount += targetCalculatedResult.getRecordsCount();
+            DataConsistencyCalculatedResult sourceCalculatedResult = sourceCalculatedResults.hasNext() ? waitFuture(executor.submit(sourceCalculatedResults::next)) : null;
+            DataConsistencyCalculatedResult targetCalculatedResult = targetCalculatedResults.hasNext() ? waitFuture(executor.submit(targetCalculatedResults::next)) : null;
+            sourceRecordsCount += null == sourceCalculatedResult ? 0 : sourceCalculatedResult.getRecordsCount();
+            targetRecordsCount += null == targetCalculatedResult ? 0 : targetCalculatedResult.getRecordsCount();
             contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
             if (!contentMatched) {
                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
                 break;
             }
-            if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+            if (null != sourceCalculatedResult && sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 progressContext.getTableCheckPositions().put(sourceTable.getTableName().getOriginal(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+            if (null != targetCalculatedResult && targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 progressContext.getTableCheckPositions().put(targetTable.getTableName().getOriginal(), targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+            progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(null == sourceCalculatedResult ? 0 : sourceCalculatedResult.getRecordsCount()));
         }
         return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 6f1adabbdee..38e5864c551 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -63,7 +63,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -114,6 +113,7 @@ class CDCE2EIT {
             Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             log.info("init data begin: {}", LocalDateTime.now());
             DataSourceExecuteUtils.execute(jdbcDataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), dataPair.getLeft());
+            DataSourceExecuteUtils.execute(containerComposer.getProxyDataSource(), "INSERT INTO t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"}));
             log.info("init data end: {}", LocalDateTime.now());
             try (
                     Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
@@ -134,20 +134,15 @@ class CDCE2EIT {
             }
             Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS)
                     .until(() -> listOrderRecords(containerComposer, getOrderTableNameWithSchema(containerComposer)).size() == actualProxyList.size());
-            SchemaTableName schemaTableName = containerComposer.getDatabaseType().isSchemaAvailable()
+            SchemaTableName orderSchemaTableName = containerComposer.getDatabaseType().isSchemaAvailable()
                     ? new SchemaTableName(new SchemaName(PipelineContainerComposer.SCHEMA_NAME), new TableName(SOURCE_TABLE_NAME))
                     : new SchemaTableName(new SchemaName(null), new TableName(SOURCE_TABLE_NAME));
+            PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
             PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4),
                     containerComposer.getDatabaseType());
-            PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
-            StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
-            PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME, "t_order");
-            PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
-            ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
-            SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
-                    tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
-            DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
-            assertTrue(checkResult.isMatched());
+            assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName);
+            assertDataMatched(new PipelineDataSourceWrapper(containerComposer.getProxyDataSource(), containerComposer.getDatabaseType()), targetDataSource,
+                    new SchemaTableName(new SchemaName(null), new TableName("t_address")));
             containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0);
             assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
         }
@@ -160,8 +155,9 @@ class CDCE2EIT {
     private void initSchemaAndTable(final PipelineContainerComposer containerComposer, final Connection connection, final int sleepSeconds) throws SQLException {
         containerComposer.createSchema(connection, sleepSeconds);
         String sql = containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
-        log.info("create table sql: {}", sql);
+        log.info("Create table sql: {}", sql);
         connection.createStatement().execute(sql);
+        connection.createStatement().execute("CREATE TABLE t_address(id integer primary key, address_name varchar(255))");
         if (sleepSeconds > 0) {
             Awaitility.await().pollDelay(sleepSeconds, TimeUnit.SECONDS).until(() -> true);
         }
@@ -183,7 +179,7 @@ class CDCE2EIT {
         // TODO add full=false test case later
         parameter.setFull(true);
         String schema = containerComposer.getDatabaseType().isSchemaAvailable() ? "test" : "";
-        parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build()));
+        parameter.setSchemaTables(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(), SchemaTable.newBuilder().setTable("t_address").build()));
         DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
         CompletableFuture.runAsync(() -> new CDCClient(parameter, recordConsumer).start(), executor).whenComplete((unused, throwable) -> {
             if (null != throwable) {
@@ -205,6 +201,17 @@ class CDCE2EIT {
         return containerComposer.getDatabaseType().isSchemaAvailable() ? String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
     }
     
+    private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final SchemaTableName schemaTableName) {
+        StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(), schemaTableName.getTableName().getOriginal());
+        PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+        ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
+        SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
+                tableMetaData.getColumnNames(), primaryKeyMetaData, null, progressContext);
+        DataConsistencyCheckResult checkResult = checker.check(new DataMatchDataConsistencyCalculateAlgorithm());
+        assertTrue(checkResult.isMatched());
+    }
+    
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new OpenGaussDatabaseType());
     }