You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/10/12 11:25:47 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

sandynz commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r993327266


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -110,7 +111,12 @@ private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm
             checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
             InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
             Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
-            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+            long recordsCount;
+            if (calculateAlgorithm instanceof CRC32MatchDataConsistencyCalculateAlgorithm) {
+                recordsCount = tableMetaData.getColumnNames().size();

Review Comment:
   1, This class is for common usage, could not hard-code `CRC32MatchDataConsistencyCalculateAlgorithm` here.
   
   2, `DataConsistencyCalculatedResult` has `getRecordsCount()` method, we could get records count for every algorithm.
   



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -35,21 +35,23 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private final PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(new MySQLDatabaseType().getType());

Review Comment:
   Could not hard-code with MySQL



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -59,13 +61,18 @@ public void init(final Properties props) {
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
-        PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        int columIndex = null == previousCalculatedResult ? 0 : (previousCalculatedResult.getColumnIndex() + 1);
+        if (columIndex >= parameter.getColumnNames().size()) {
+            return Optional.empty();
+        }
+        List<String> columnNames = new ArrayList<>(parameter.getColumnNames());
+        CalculatedItem calculatedItem = calculateCRC32(parameter, columnNames.get(columIndex));
+        return Optional.of(new CalculatedResult(1, calculatedItem.getCrc32(), columIndex));

Review Comment:
   1, We could get real records count from DataConsistencyCalculatedResult, but not hard-coded `1`.
   
   2, If there's much records, calculate on every column will cost much time, the progress is still not updated, so it's better to calculate crc32 block by block.
   



##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java:
##########
@@ -53,40 +45,17 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithmTest {
     @Mock
     private PipelineDataSourceWrapper pipelineDataSource;
     
-    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-    private Connection connection;
-    
     @Before
     public void setUp() throws SQLException {
         PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true);
         parameter = new DataConsistencyCalculateParameter(pipelineDataSource, null,
                 "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", "FIXTURE", uniqueKey);
-        when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
     
-    @Test
-    public void assertCalculateSuccess() throws SQLException {
-        PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
-        when(connection.prepareStatement("SELECT CRC32(foo_col) FROM foo_tbl")).thenReturn(preparedStatement0);
-        PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
-        when(connection.prepareStatement("SELECT CRC32(bar_col) FROM foo_tbl")).thenReturn(preparedStatement1);
+    @Test(expected = UnsupportedCRC32DataConsistencyCalculateAlgorithmException.class)
+    public void assertCalculateFailed() {

Review Comment:
   Why is it unsupported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org