You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/09/16 10:07:04 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21019: Show migration status add processed records column

sandynz commented on code in PR #21019:
URL: https://github.com/apache/shardingsphere/pull/21019#discussion_r972859949


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java:
##########
@@ -44,4 +45,11 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
      * @return incremental tasks
      */
     Collection<IncrementalTask> getIncrementalTasks();
+    
+    /**
+     * Get processed record count.
+     *
+     * @return processed record count.
+     */
+    long getProcessedRecordCount();

Review Comment:
   Could we change it to `getProcessedRecordsCount`?



##########
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java:
##########
@@ -60,14 +61,15 @@ public void init(final ShardingSphereDatabase database, final SQLStatement sqlSt
                         result.add("");
                         result.add("");
                         result.add("");
+                        result.add("");
                     }
                     return result;
                 }).collect(Collectors.toList()).iterator();
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("item", "data_source", "status", "active", "inventory_finished_percentage", "incremental_idle_seconds");
+        return Arrays.asList("item", "data_source", "status", "active", "processed_records", "inventory_finished_percentage", "incremental_idle_seconds");

Review Comment:
   Could we use `processed_records_count`? Keep consistency with variable



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProcessUpdateParameter.java:
##########
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 /**
- * Default pipeline job progress listener implementation.
+ * Pipeline job process update parameter.
  */
 @RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements PipelineJobProgressListener {
+@Getter
+public final class PipelineJobProcessUpdateParameter {

Review Comment:
   Could we rename class name `PipelineJobProcessUpdateParameter` to `PipelineJobProgressUpdatedParameter`? Similar to method name `onProgressUpdated`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java:
##########
@@ -37,4 +37,6 @@ public final class YamlInventoryIncrementalJobItemProgress implements YamlConfig
     private YamlJobItemInventoryTasksProgress inventory;
     
     private YamlJobItemIncrementalTasksProgress incremental;
+    
+    private long processedRecordCount;

Review Comment:
   processedRecordsCount too, and also yaml class



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java:
##########
@@ -39,4 +39,6 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte
     private JobItemInventoryTasksProgress inventory;
     
     private JobItemIncrementalTasksProgress incremental;
+    
+    private long processedRecordCount;

Review Comment:
   Could use `processedRecordsCount` to replace `processedRecordCount`?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProcessUpdateParameter.java:
##########
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 /**
- * Default pipeline job progress listener implementation.
+ * Pipeline job process update parameter.
  */
 @RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements PipelineJobProgressListener {
+@Getter
+public final class PipelineJobProcessUpdateParameter {
     
-    private final String jobId;
+    private final int insertRecordNumber;
     
-    private final int shardingItem;
-    
-    @Override
-    public void onProgressUpdated() {
-        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
-    }
+    private final int deleteRecordNumber;

Review Comment:
   1, Could we replace `RecordNumber` to `RecordsCount`?
   
   2, insert and deleted could inserted and deleted
   
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java:
##########
@@ -92,6 +97,9 @@ public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final
         this.shardingItem = shardingItem;
         this.dataSourceName = taskConfig.getDataSourceName();
         this.initProgress = initProgress;
+        if (null != initProgress) {
+            processedRecordCount.addAndGet(initProgress.getProcessedRecordCount());
+        }

Review Comment:
   It's better to use `set` instead of `addAndGet`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java:
##########
@@ -125,4 +133,16 @@ public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
     public boolean isSourceTargetDatabaseTheSame() {
         return jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
     }
+    
+    @Override
+    public void onProgressUpdated(final PipelineJobProcessUpdateParameter parameter) {
+        int needAddNumber = parameter.getInsertRecordNumber() - parameter.getDeleteRecordNumber();
+        processedRecordCount.addAndGet(needAddNumber);
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+    }
+    
+    @Override
+    public long getProcessedRecordCount() {
+        return processedRecordCount.get();
+    }

Review Comment:
   processedRecordsCount too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org