You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/10/10 13:22:40 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21441: Refactor show migration check status DistSQL implementation

sandynz commented on code in PR #21441:
URL: https://github.com/apache/shardingsphere/pull/21441#discussion_r991202520


##########
features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java:
##########
@@ -46,20 +44,18 @@ public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDis
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
         ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement;
-        Map<String, DataConsistencyCheckResult> consistencyCheckResult = JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
-        List<Collection<Object>> result = new ArrayList<>(consistencyCheckResult.size());
-        for (Entry<String, DataConsistencyCheckResult> entry : consistencyCheckResult.entrySet()) {
-            DataConsistencyCheckResult value = entry.getValue();
-            DataConsistencyCountCheckResult countCheckResult = value.getCountCheckResult();
-            result.add(Arrays.asList(entry.getKey(), countCheckResult.getSourceRecordsCount(), countCheckResult.getTargetRecordsCount(), String.valueOf(countCheckResult.isMatched()),
-                    String.valueOf(value.getContentCheckResult().isMatched())));
-        }
+        ConsistencyCheckJobProgressInfo progressInfo = JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
+        List<Collection<Object>> result = new LinkedList<>();
+        String checkResult = null == progressInfo.getCheckResult() ? "" : progressInfo.getCheckResult().toString();
+        result.add(Arrays.asList(progressInfo.getTableName(), checkResult, String.valueOf(progressInfo.getInventoryFinishedPercentage()),
+                ObjectUtils.defaultIfNull(progressInfo.getRemainingTime(), ""), progressInfo.getCheckBeginTime(), ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
+                ObjectUtils.defaultIfNull(progressInfo.getCheckDuration(), ""), progressInfo.getErrorMessage()));
         data = result.iterator();
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("table_name", "source_records_count", "target_records_count", "records_count_matched", "records_content_matched");
+        return Arrays.asList("table_name", "check_result", "inventory_finished_percentage", "remaining_time", "check_begin_time", "check_end_time", "check_duration", "error_message");

Review Comment:
   Columns name could be improved:
   - Remove `check_` prefix
   - `inventory_finished_percentage` could be `finished_percentage`
   - `remaining_time` could be `remaining_seconds`
   - `check_duration` could be `duration_seconds`
   



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -76,32 +84,41 @@ public final class SingleTableInventoryDataConsistencyChecker {
      * Data consistency check.
      *
      * @param calculateAlgorithm calculate algorithm
+     * @param consistencyCheckJobItemContext job progress listener
      * @return data consistency check result
      */
-    public DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+    public DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + "-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
-            return check(calculateAlgorithm, executor);
+            return check(calculateAlgorithm, executor, consistencyCheckJobItemContext);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
         }
     }
     
-    private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor) {
+    private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor,
+                                             final ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
         String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
         String targetDatabaseType = targetDataSource.getDatabaseType().getType();
         String sourceTableName = sourceTable.getTableName().getOriginal();
-        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(), sourceTableName);
+        consistencyCheckJobItemContext.setTableName(sourceTableName);
+        String schemeName = sourceTable.getSchemaName().getOriginal();
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemeName, sourceTableName);
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
         Collection<String> columnNames = tableMetaData.getColumnNames();
         DataConsistencyCalculateParameter sourceParameter = buildParameter(
-                sourceDataSource, sourceTable.getSchemaName().getOriginal(), sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
+                sourceDataSource, schemeName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
         DataConsistencyCalculateParameter targetParameter = buildParameter(
                 targetDataSource, targetTable.getSchemaName().getOriginal(), targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
         Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
         Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
+        executor.submit(() -> {
+            // TODO use select count may take too long and cause a timeout
+            long sourceRecordsCount = count(sourceDataSource, schemeName, sourceTableName, sourceDataSource.getDatabaseType());
+            consistencyCheckJobItemContext.setRecordsCount(sourceRecordsCount);
+        });

Review Comment:
   1, `executor` has only `2` max threads and `2` capacity blocking queue, so the 3rd submit might be blocked and might be timed out (60 seconds).
   
   2, Could we reuse records count in `InventoryIncrementalJobItemContext.getProcessedRecordsCount()`? Thought it's not accurate totally, specially when user do consistency check before inventory dump is done.
   
   3, If option 2 could not be done, then we could calculate records count blocking for now.
   



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java:
##########
@@ -97,17 +98,19 @@ public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI,
      * Do data consistency check.
      *
      * @param jobId job id
+     * @param jobProgressListener job progress listener
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, PipelineJobProgressListener jobProgressListener);
     
     /**
      * Do data consistency check.
      *
      * @param jobId job id
      * @param algorithmType algorithm type
      * @param algorithmProps algorithm props. Nullable
+     * @param jobProgressListener consistency check job progress listener
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps, PipelineJobProgressListener jobProgressListener);

Review Comment:
   Could we remove the 2nd parameter `PipelineJobProgressListener jobProgressListener`? Since it's not necessary



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -76,32 +84,41 @@ public final class SingleTableInventoryDataConsistencyChecker {
      * Data consistency check.
      *
      * @param calculateAlgorithm calculate algorithm
+     * @param consistencyCheckJobItemContext job progress listener
      * @return data consistency check result
      */
-    public DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+    public DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {

Review Comment:
   Parameter `ConsistencyCheckJobItemContext consistencyCheckJobItemContext` could be used as class field, but not method parameter



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java:
##########
@@ -241,8 +243,9 @@ public MigrationProcessContext buildPipelineProcessContext(final PipelineJobConf
     }
     
     @Override
-    protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext);
+    protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+                                                                                 final PipelineJobProgressListener jobProgressListener) {
+        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext, (ConsistencyCheckJobItemContext) jobProgressListener);

Review Comment:
   The class cast is strange, it's better to remove it



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -156,6 +165,45 @@ public void stopByParentJobId(final String parentJobId) {
         stop(checkLatestJobId.get());
     }
     
+    @Override
+    public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String parentJobId) {
+        Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+        String checkJobId = checkLatestJobId.get();
+        ConsistencyCheckJobProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+        ConsistencyCheckJobProgressInfo result = new ConsistencyCheckJobProgressInfo();
+        if (null == jobItemProgress) {
+            return result;
+        }
+        int inventoryFinishedPercentage;
+        LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
+        if (null != jobItemProgress.getRecordsCount() && Objects.equals(jobItemProgress.getCheckedRecordsCount(), jobItemProgress.getRecordsCount())) {
+            inventoryFinishedPercentage = 100;
+            LocalDateTime checkEndTime = new Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
+            Duration duration = Duration.between(checkBeginTime, checkEndTime);
+            result.setCheckDuration(duration.toMillis() / 1000);
+            result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
+            result.setRemainingTime(0L);
+        } else {
+            if (null == jobItemProgress.getRecordsCount()) {
+                inventoryFinishedPercentage = 0;
+            } else {
+                inventoryFinishedPercentage = BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() * 100, jobItemProgress.getRecordsCount())).intValue();

Review Comment:
   `getCheckedRecordsCount` might be greater than `getRecordsCount`, the percentage could not greater than 100



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -43,17 +48,34 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
     
     private volatile JobStatus status;
     
+    private String tableName;
+    
+    private volatile Long recordsCount;
+    
+    private final AtomicLong checkedRecordsCount = new AtomicLong(0);
+    
+    private final long checkBeginTimeMillis;
+    
+    private Long checkEndTimeMillis;
+    
     private final ConsistencyCheckJobConfiguration jobConfig;
     
     public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final JobStatus status) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
         this.status = status;
+        this.checkBeginTimeMillis = System.currentTimeMillis();

Review Comment:
   `this.` prefix is not necessary



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+
+/**
+ * Consistency check jon progress info.
+ */
+
+@Data
+public final class ConsistencyCheckJobProgressInfo {
+    
+    private String tableName;
+    
+    private Boolean checkResult;
+    
+    private int inventoryFinishedPercentage;
+    
+    private Long remainingTime;
+    
+    private String checkBeginTime;
+    
+    private String checkEndTime;
+    
+    private Long checkDuration;

Review Comment:
   These fields name could be updated, keep consistency as DistSQL response column names.
   And also related yaml class.
   And also `ConsistencyCheckJobProgress`.



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java:
##########
@@ -99,8 +100,10 @@ protected void runBlocking() {
             DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
                     parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
             this.calculateAlgorithm = calculateAlgorithm;
-            Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+            PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, jobItemContext.getShardingItem());

Review Comment:
   `addJobProgressPersistContext` exists in `ConsistencyCheckJob`, looks it's duplicated



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java:
##########
@@ -125,6 +128,7 @@ public void onSuccess() {
             jobItemContext.setStatus(JobStatus.FINISHED);
             checkJobAPI.persistJobItemProgress(jobItemContext);
             checkJobAPI.stop(checkJobId);
+            
         }

Review Comment:
   Blank line in method should be removed



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java:
##########
@@ -56,9 +57,11 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
      *
      * @param pipelineJobConfig job configuration
      * @param calculateAlgorithm calculate algorithm
+     * @param checkJobProgressListener consistency check job progress listener
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm,
+                                                                 PipelineJobProgressListener checkJobProgressListener);

Review Comment:
   `PipelineJobProgressListener jobProgressListener` in this interface is confusing, since there's already PipelineJobProgressListener for inventory job.
   Could we just use `ConsistencyCheckJobItemContext`?



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java:
##########
@@ -57,11 +58,15 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
+    private final ConsistencyCheckJobItemContext consistencyCheckJobItemContext;

Review Comment:
   `consistencyCheckJobItemContext` could be shorter, e.g. `checkJobItemContext`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -43,17 +48,34 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
     
     private volatile JobStatus status;
     
+    private String tableName;

Review Comment:
   It's better to use `Collection<String> tableNames` to replace `String tableName`, since there might be several tables



##########
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java:
##########
@@ -167,16 +168,16 @@ protected void assertCheckMigrationSuccess(final String jobId, final String algo
         List<Map<String, Object>> checkJobResults = Collections.emptyList();
         for (int i = 0; i < 10; i++) {
             checkJobResults = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
-            if (null != checkJobResults && !checkJobResults.isEmpty()) {
+            List<String> checkEndTimeList = checkJobResults.stream().map(map -> map.get("check_end_time").toString()).filter(StringUtils::isNotBlank).collect(Collectors.toList());
+            if (checkEndTimeList.size() == checkJobResults.size()) {
                 break;
             }
             ThreadUtil.sleep(5, TimeUnit.SECONDS);
         }
-        assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
         log.info("check job results: {}", checkJobResults);
         for (Map<String, Object> entry : checkJobResults) {
-            assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
-            assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+            assertTrue(Boolean.parseBoolean(entry.get("check_result").toString()));
+            assertThat(entry.get("inventory_finished_percentage").toString(), is("100"));

Review Comment:
   These DistSQL response cloumn name could be updated, keep consistency



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -156,6 +165,45 @@ public void stopByParentJobId(final String parentJobId) {
         stop(checkLatestJobId.get());
     }
     
+    @Override
+    public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String parentJobId) {
+        Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+        String checkJobId = checkLatestJobId.get();
+        ConsistencyCheckJobProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+        ConsistencyCheckJobProgressInfo result = new ConsistencyCheckJobProgressInfo();
+        if (null == jobItemProgress) {
+            return result;
+        }
+        int inventoryFinishedPercentage;

Review Comment:
   Variable name could be improved, keep consistency with DistSQL response column name.
   e.g. inventoryFinishedPercentage. And also others.



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java:
##########
@@ -27,4 +27,14 @@
 public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
     
     private String status;
+    
+    private String tableName;
+    

Review Comment:
   It's better to use `tableNames`, since there might be several tables



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+
+/**
+ * Consistency check jon progress info.
+ */
+
+@Data
+public final class ConsistencyCheckJobProgressInfo {
+    
+    private String tableName;
+    

Review Comment:
   It's better to use `tableNames`, since there might be several tables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org