You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/10/12 10:54:24 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #21526: Improve crc32 match data consistency check progress

azexcy opened a new pull request, #21526:
URL: https://github.com/apache/shardingsphere/pull/21526

   
   
   Changes proposed in this pull request:
     - Improve crc32 match data consistency check progress
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `mvn clean install -B -T2C -DskipTests -Dmaven.javadoc.skip=true -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy closed pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy closed pull request #21526: Improve crc32 match data consistency check progress
URL: https://github.com/apache/shardingsphere/pull/21526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994603771


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }

Review Comment:
   relation issue #21540, maybe we can improve later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy commented on PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#issuecomment-1328598434

   The current branch already has a lot of refactoring and a lot of conflicts, so it was closed first


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994645969


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }

Review Comment:
   Ok, I'll try another solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994574548


##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java:
##########
@@ -76,6 +76,11 @@ public String buildChunkedQuerySQL(final String schemaName, final String tableNa
         return "";
     }
     
+    @Override
+    public String buildChunkedQueryUniqueKeySQL(final String schemaName, final String tableName, final String uniqueKey, final boolean firstQuery) {
+        return String.format("SELECT %s FROM %s ORDER BY %s ASC LIMIT ?", uniqueKey, tableName, uniqueKey);
+    }

Review Comment:
   Changed to "", just use to Just to adapt unit tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r993327266


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -110,7 +111,12 @@ private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm
             checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
             InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
             Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
-            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+            long recordsCount;
+            if (calculateAlgorithm instanceof CRC32MatchDataConsistencyCalculateAlgorithm) {
+                recordsCount = tableMetaData.getColumnNames().size();

Review Comment:
   1, This class is for common usage, could not hard-code `CRC32MatchDataConsistencyCalculateAlgorithm` here.
   
   2, `DataConsistencyCalculatedResult` has `getRecordsCount()` method, we could get records count for every algorithm.
   



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -35,21 +35,23 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private final PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(new MySQLDatabaseType().getType());

Review Comment:
   Could not hard-code with MySQL



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -59,13 +61,18 @@ public void init(final Properties props) {
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
-        PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        int columIndex = null == previousCalculatedResult ? 0 : (previousCalculatedResult.getColumnIndex() + 1);
+        if (columIndex >= parameter.getColumnNames().size()) {
+            return Optional.empty();
+        }
+        List<String> columnNames = new ArrayList<>(parameter.getColumnNames());
+        CalculatedItem calculatedItem = calculateCRC32(parameter, columnNames.get(columIndex));
+        return Optional.of(new CalculatedResult(1, calculatedItem.getCrc32(), columIndex));

Review Comment:
   1, We could get real records count from DataConsistencyCalculatedResult, but not hard-coded `1`.
   
   2, If there's much records, calculate on every column will cost much time, the progress is still not updated, so it's better to calculate crc32 block by block.
   



##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java:
##########
@@ -53,40 +45,17 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithmTest {
     @Mock
     private PipelineDataSourceWrapper pipelineDataSource;
     
-    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-    private Connection connection;
-    
     @Before
     public void setUp() throws SQLException {
         PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true);
         parameter = new DataConsistencyCalculateParameter(pipelineDataSource, null,
                 "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", "FIXTURE", uniqueKey);
-        when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
     
-    @Test
-    public void assertCalculateSuccess() throws SQLException {
-        PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
-        when(connection.prepareStatement("SELECT CRC32(foo_col) FROM foo_tbl")).thenReturn(preparedStatement0);
-        PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
-        when(connection.prepareStatement("SELECT CRC32(bar_col) FROM foo_tbl")).thenReturn(preparedStatement1);
+    @Test(expected = UnsupportedCRC32DataConsistencyCalculateAlgorithmException.class)
+    public void assertCalculateFailed() {

Review Comment:
   Why is it unsupported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] codecov-commenter commented on pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#issuecomment-1276073179

   # [Codecov](https://codecov.io/gh/apache/shardingsphere/pull/21526?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#21526](https://codecov.io/gh/apache/shardingsphere/pull/21526?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f121d1b) into [master](https://codecov.io/gh/apache/shardingsphere/commit/248b7eab75d9525c2633d075bd4dc197f73d2e17?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (248b7ea) will **decrease** coverage by `0.01%`.
   > The diff coverage is `33.33%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #21526      +/-   ##
   ============================================
   - Coverage     61.15%   61.14%   -0.02%     
   - Complexity     2510     2512       +2     
   ============================================
     Files          4109     4109              
     Lines         57121    57128       +7     
     Branches       7559     7562       +3     
   ============================================
   - Hits          34934    34930       -4     
   - Misses        19253    19262       +9     
   - Partials       2934     2936       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/shardingsphere/pull/21526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...cy/SingleTableInventoryDataConsistencyChecker.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL2NoZWNrL2NvbnNpc3RlbmN5L1NpbmdsZVRhYmxlSW52ZW50b3J5RGF0YUNvbnNpc3RlbmN5Q2hlY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...m/CRC32MatchDataConsistencyCalculateAlgorithm.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL2NoZWNrL2NvbnNpc3RlbmN5L2FsZ29yaXRobS9DUkMzMk1hdGNoRGF0YUNvbnNpc3RlbmN5Q2FsY3VsYXRlQWxnb3JpdGhtLmphdmE=) | `29.16% <41.66%> (-24.33%)` | :arrow_down: |
   | [...bleDataConsistencyCheckLoadingFailedException.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL2V4Y2VwdGlvbi9kYXRhL1BpcGVsaW5lVGFibGVEYXRhQ29uc2lzdGVuY3lDaGVja0xvYWRpbmdGYWlsZWRFeGNlcHRpb24uamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...thm/AbstractDataConsistencyCalculateAlgorithm.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL2NoZWNrL2NvbnNpc3RlbmN5L2FsZ29yaXRobS9BYnN0cmFjdERhdGFDb25zaXN0ZW5jeUNhbGN1bGF0ZUFsZ29yaXRobS5qYXZh) | `10.00% <0.00%> (-10.00%)` | :arrow_down: |
   | [...line/core/sqlbuilder/OraclePipelineSQLBuilder.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL3NxbGJ1aWxkZXIvT3JhY2xlUGlwZWxpbmVTUUxCdWlsZGVyLmphdmE=) | `10.52% <0.00%> (+5.26%)` | :arrow_up: |
   | [...actStreamingDataConsistencyCalculateAlgorithm.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL2NoZWNrL2NvbnNpc3RlbmN5L2FsZ29yaXRobS9BYnN0cmFjdFN0cmVhbWluZ0RhdGFDb25zaXN0ZW5jeUNhbGN1bGF0ZUFsZ29yaXRobS5qYXZh) | `30.00% <0.00%> (+25.00%)` | :arrow_up: |
   | [...ine/core/sqlbuilder/DefaultPipelineSQLBuilder.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL3NxbGJ1aWxkZXIvRGVmYXVsdFBpcGVsaW5lU1FMQnVpbGRlci5qYXZh) | `100.00% <0.00%> (+50.00%)` | :arrow_up: |
   | [...C32DataConsistencyCalculateAlgorithmException.java](https://codecov.io/gh/apache/shardingsphere/pull/21526/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jb3JlL2V4Y2VwdGlvbi9kYXRhL1Vuc3VwcG9ydGVkQ1JDMzJEYXRhQ29uc2lzdGVuY3lDYWxjdWxhdGVBbGdvcml0aG1FeGNlcHRpb24uamF2YQ==) | `100.00% <0.00%> (+100.00%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994526630


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;

Review Comment:
   Default chunk size could be greater, e.g. 10_0000



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }
+            return maxUniqueKeyValue;
+        } catch (final SQLException ex) {
+            log.error("get max unique key value failed", ex);
+            throw new PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
+        }
+    }
+    
+    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName, final Object beginId, final Object endId) {
         String logicTableName = parameter.getLogicTableName();
         String schemaName = parameter.getSchemaName();
-        Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName, logicTableName, columnName);
-        ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
-        return calculateCRC32(parameter.getDataSource(), logicTableName, sql.get());
+        String cacheKey = "crc32-" + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.get(cacheKey);
+        if (null == sql) {
+            Optional<String> optional = sqlBuilder.buildCRC32SQL(schemaName, logicTableName, columnName, parameter.getUniqueKey().getName());
+            ShardingSpherePreconditions.checkState(optional.isPresent(), () -> new UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
+            sql = optional.get();
+            sqlCache.put(cacheKey, sql);
+        }
+        return calculateCRC32(parameter.getDataSource(), logicTableName, sql, beginId, endId);
     }
     
-    private CalculatedItem calculateCRC32(final DataSource dataSource, final String logicTableName, final String sql) {
+    private CalculatedItem calculateCRC32(final DataSource dataSource, final String logicTableName, final String sql, final Object beginId, final Object endId) {
         try (

Review Comment:
   The second `calculateCRC32` method could be merged into the first one



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }

Review Comment:
   It's not efficient enough to get maximum value of unique key. Could we query `max(uniqueKey)` in SQL?



##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java:
##########
@@ -76,6 +76,11 @@ public String buildChunkedQuerySQL(final String schemaName, final String tableNa
         return "";
     }
     
+    @Override
+    public String buildChunkedQueryUniqueKeySQL(final String schemaName, final String tableName, final String uniqueKey, final boolean firstQuery) {
+        return String.format("SELECT %s FROM %s ORDER BY %s ASC LIMIT ?", uniqueKey, tableName, uniqueKey);
+    }

Review Comment:
   If it's not necessary to return real SQL, then we could remove it, or else we need to update it



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java:
##########
@@ -138,6 +138,17 @@ default Optional<String> buildCreateSchemaSQL(String schemaName) {
      */
     String buildChunkedQuerySQL(String schemaName, String tableName, String uniqueKey, boolean firstQuery);
     
+    /**
+     * Build query unique key SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param uniqueKey unique key, it may be primary key, not null
+     * @param firstQuery first query
+     * @return query unique key SQL
+     */
+    String buildChunkedQueryUniqueKeySQL(String schemaName, String tableName, String uniqueKey, boolean firstQuery);

Review Comment:
   Method name could be improved, e.g. getMaxUniqueKeyValue



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }
+            return maxUniqueKeyValue;
+        } catch (final SQLException ex) {
+            log.error("get max unique key value failed", ex);
+            throw new PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
+        }
+    }
+    
+    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName, final Object beginId, final Object endId) {
         String logicTableName = parameter.getLogicTableName();
         String schemaName = parameter.getSchemaName();
-        Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName, logicTableName, columnName);
-        ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
-        return calculateCRC32(parameter.getDataSource(), logicTableName, sql.get());
+        String cacheKey = "crc32-" + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.get(cacheKey);
+        if (null == sql) {
+            Optional<String> optional = sqlBuilder.buildCRC32SQL(schemaName, logicTableName, columnName, parameter.getUniqueKey().getName());
+            ShardingSpherePreconditions.checkState(optional.isPresent(), () -> new UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
+            sql = optional.get();
+            sqlCache.put(cacheKey, sql);
+        }

Review Comment:
   It could be extracted as `getQuerySQL` method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994246774


##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java:
##########
@@ -53,40 +45,17 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithmTest {
     @Mock
     private PipelineDataSourceWrapper pipelineDataSource;
     
-    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-    private Connection connection;
-    
     @Before
     public void setUp() throws SQLException {
         PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true);
         parameter = new DataConsistencyCalculateParameter(pipelineDataSource, null,
                 "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", "FIXTURE", uniqueKey);
-        when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
     
-    @Test
-    public void assertCalculateSuccess() throws SQLException {
-        PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
-        when(connection.prepareStatement("SELECT CRC32(foo_col) FROM foo_tbl")).thenReturn(preparedStatement0);
-        PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
-        when(connection.prepareStatement("SELECT CRC32(bar_col) FROM foo_tbl")).thenReturn(preparedStatement1);
+    @Test(expected = UnsupportedCRC32DataConsistencyCalculateAlgorithmException.class)
+    public void assertCalculateFailed() {

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994593114


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }

Review Comment:
   `SELECT max(id) FROM t_order ORDER BY id ASC LIMIT 1` the limit not effiect when use `max()`, 
   and sub query `SELECT MAX(id) FROM (SELECT id FROM t_order ORDER BY id ASC LIMIT 1) t;` is different now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
azexcy commented on PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#issuecomment-1275972734

   now crc32 match progress just like
   
   ```
   mysql> show  MIGRATION check status 'j01019761922f8fab5ce3c61496c22f733586';
   +--------+--------+---------------------+-------------------+------------------+----------------+------------------+---------------+
   | tables | result | finished_percentage | remaining_seconds | check_begin_time | check_end_time | duration_seconds | error_message |
   +--------+--------+---------------------+-------------------+------------------+----------------+------------------+---------------+
   |        | false  | 0                   | NULL              |                  |                | NULL             |               |
   +--------+--------+---------------------+-------------------+------------------+----------------+------------------+---------------+
   1 row in set (0.01 sec)
   
   mysql> show  MIGRATION check status 'j01019761922f8fab5ce3c61496c22f733586';
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | tables  | result | finished_percentage | remaining_seconds | check_begin_time        | check_end_time | duration_seconds | error_message |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | t_order | false  | 14                  | 44                | 2022-10-12 16:43:25.284 |                | 7                |               |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   1 row in set (0.01 sec)
   
   mysql> show  MIGRATION check status 'j01019761922f8fab5ce3c61496c22f733586';
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | tables  | result | finished_percentage | remaining_seconds | check_begin_time        | check_end_time | duration_seconds | error_message |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | t_order | false  | 28                  | 25                | 2022-10-12 16:43:25.284 |                | 10               |               |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   1 row in set (0.05 sec)
   
   mysql> show  MIGRATION check status 'j01019761922f8fab5ce3c61496c22f733586';
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | tables  | result | finished_percentage | remaining_seconds | check_begin_time        | check_end_time | duration_seconds | error_message |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | t_order | false  | 42                  | 16                | 2022-10-12 16:43:25.284 |                | 12               |               |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   1 row in set (0.01 sec)
   
   mysql> show  MIGRATION check status 'j01019761922f8fab5ce3c61496c22f733586';
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | tables  | result | finished_percentage | remaining_seconds | check_begin_time        | check_end_time | duration_seconds | error_message |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | t_order | false  | 71                  | 5                 | 2022-10-12 16:43:25.284 |                | 14               |               |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   1 row in set (0.03 sec)
   
   mysql> show  MIGRATION check status 'j01019761922f8fab5ce3c61496c22f733586';
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | tables  | result | finished_percentage | remaining_seconds | check_begin_time        | check_end_time | duration_seconds | error_message |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   | t_order | false  | 85                  | 3                 | 2022-10-12 16:43:25.284 |                | 18               |               |
   +---------+--------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
   1 row in set (0.02 sec)
   
   mysql> show  MIGRATION check status 'j01019761922f8fab5ce3c61496c22f733586';
   +---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
   | tables  | result | finished_percentage | remaining_seconds | check_begin_time        | check_end_time          | duration_seconds | error_message |
   +---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
   | t_order | true   | 100                 | 0                 | 2022-10-12 16:43:25.284 | 2022-10-12 16:43:44.615 | 19               |               |
   +---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
   1 row in set (0.02 sec)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21526: Improve crc32 match data consistency check progress

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994611870


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }

Review Comment:
   It might hurt performance too much, if check crc32 block by block just benefit for check progress, it might not worth to do it.
   
   So we need try to find another way, or else just give up it for now.
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org