You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/21 10:18:31 UTC

[shardingsphere] branch master updated: Fix sonar issues in pipeline (#26487)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f55ce2a94c3 Fix sonar issues in pipeline (#26487)
f55ce2a94c3 is described below

commit f55ce2a94c34481b3cf8ed30f7e2a09d9bfde482
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Jun 21 18:18:24 2023 +0800

    Fix sonar issues in pipeline (#26487)
    
    * Move comment
    
    * Reduce Cognitive Complexity of SingleTableInventoryDataConsistencyChecker.check0
---
 ...SingleTableInventoryDataConsistencyChecker.java | 23 ++++++++++++++--------
 .../CoordinatorRegistryCenterInitializer.java      |  2 +-
 2 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 37f6ee9f118..2bb9b58429a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -116,26 +116,33 @@ public final class SingleTableInventoryDataConsistencyChecker {
         long sourceRecordsCount = 0;
         long targetRecordsCount = 0;
         boolean contentMatched = true;
-        while (sourceCalculatedResults.hasNext() || targetCalculatedResults.hasNext()) {
+        while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
             if (null != readRateLimitAlgorithm) {
                 readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
             }
-            DataConsistencyCalculatedResult sourceCalculatedResult = sourceCalculatedResults.hasNext() ? waitFuture(executor.submit(sourceCalculatedResults::next)) : null;
-            DataConsistencyCalculatedResult targetCalculatedResult = targetCalculatedResults.hasNext() ? waitFuture(executor.submit(targetCalculatedResults::next)) : null;
-            sourceRecordsCount += null == sourceCalculatedResult ? 0 : sourceCalculatedResult.getRecordsCount();
-            targetRecordsCount += null == targetCalculatedResult ? 0 : targetCalculatedResult.getRecordsCount();
+            DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
+            DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
+            sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
+            targetRecordsCount += targetCalculatedResult.getRecordsCount();
             contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
             if (!contentMatched) {
                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
                 break;
             }
-            if (null != sourceCalculatedResult && sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+            if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 progressContext.getTableCheckPositions().put(sourceTable.getTableName().getOriginal(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            if (null != targetCalculatedResult && targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+            if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 progressContext.getTableCheckPositions().put(targetTable.getTableName().getOriginal(), targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(null == sourceCalculatedResult ? 0 : sourceCalculatedResult.getRecordsCount()));
+            progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+        }
+        if (sourceCalculatedResults.hasNext()) {
+            // TODO Refactor DataConsistencyCalculatedResult to represent inaccurate number
+            return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount + 1, targetRecordsCount), new DataConsistencyContentCheckResult(false));
+        }
+        if (targetCalculatedResults.hasNext()) {
+            return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount + 1), new DataConsistencyContentCheckResult(false));
         }
         return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
index 36db76a53f1..27e0068f42e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
@@ -47,8 +47,8 @@ public final class CoordinatorRegistryCenterInitializer {
         return result;
     }
     
-    // TODO Merge registry center code in ElasticJob and ShardingSphere mode; Use spi to load impl;
     private ZookeeperConfiguration getZookeeperConfig(final ClusterPersistRepositoryConfiguration repositoryConfig, final String namespaceRelativePath) {
+        // TODO Merge registry center code in ElasticJob and ShardingSphere mode; Use SPI to load impl
         Properties props = repositoryConfig.getProps();
         ZookeeperProperties zookeeperProps = new ZookeeperProperties(props);
         String namespace = repositoryConfig.getNamespace() + (null != namespaceRelativePath ? namespaceRelativePath : "");