You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/10/12 01:32:21 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #21513: Improve processed records count calculatation at pipeline increment task.

azexcy opened a new pull request, #21513:
URL: https://github.com/apache/shardingsphere/pull/21513

   the merge method may merge INSERT and DELETE operations, so calculatate count before merge 
   
   Changes proposed in this pull request:
     - Calculate processed records count before merged operation
     - Improve mysql client reconnect
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have passed maven check: `mvn clean install -B -T2C -DskipTests -Dmaven.javadoc.skip=true -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21513: Improve processed records count calculatation at pipeline increment task.

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#discussion_r992923456


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -298,14 +298,18 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
             log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
                 running = false;
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent.getFileName()) {
+                log.warn("can't get file name from binlog event, last binlog event:{}", lastBinlogEvent);
+                return;
+            }

Review Comment:
   somethimes ,the lastBinlogEvent isn't set value after `connect()`, then if execute stop migration will execute the `reconnect()` , cause NPE at `subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21513: Improve processed records count calculatation at pipeline increment task.

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#discussion_r992923456


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -298,14 +298,18 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
             log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
                 running = false;
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent.getFileName()) {
+                log.warn("can't get file name from binlog event, last binlog event:{}", lastBinlogEvent);
+                return;
+            }

Review Comment:
   somethimes ,the lastBinlogEvent isn's set value after `connect()`, then if execute stop migration the `reconnect()` method will execute, will cause NPE at `subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21513: Improve processed records count calculatation at pipeline increment task.

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#discussion_r992926238


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -298,14 +298,18 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
             log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
                 running = false;
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent.getFileName()) {
+                log.warn("can't get file name from binlog event, last binlog event:{}", lastBinlogEvent);
+                return;
+            }

Review Comment:
   OK.
   Looks `lastBinlogEvent.getPosition()` has similar issue, could we verify it too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #21513: Improve processed records count calculatation at pipeline increment task.

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#discussion_r992920684


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java:
##########
@@ -115,13 +115,20 @@ private void write() {
     }
     
     private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
-        List<GroupedDataRecord> result = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+        List<DataRecord> dataRecords = buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList());
         int insertRecordNumber = 0;
         int deleteRecordNumber = 0;
+        for (DataRecord each : dataRecords) {
+            if (IngestDataChangeType.INSERT.equals(each.getType())) {
+                insertRecordNumber++;
+            }
+            if (IngestDataChangeType.DELETE.equals(each.getType())) {
+                deleteRecordNumber++;
+            }

Review Comment:
   Currently, `processedRecordsCount = insertRecordNumber - deleteRecordNumber`.
   Looks `deleteRecordNumber` should be ignored, since the deleted records must be inserted before on migration case, so it's already processed.



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -298,14 +298,18 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
             log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
                 running = false;
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent.getFileName()) {
+                log.warn("can't get file name from binlog event, last binlog event:{}", lastBinlogEvent);
+                return;
+            }

Review Comment:
   How it occur?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #21513: Improve processed records count calculatation at pipeline increment task.

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#discussion_r992923456


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -298,14 +298,18 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
             log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
                 running = false;
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent.getFileName()) {
+                log.warn("can't get file name from binlog event, last binlog event:{}", lastBinlogEvent);
+                return;
+            }

Review Comment:
   somethimes ,the lastBinlogEvent isn't set value after `connect()`, then if execute stop migration the `reconnect()` method will execute, will cause NPE at `subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #21513: Improve processed records count calculatation at pipeline increment task.

Posted by GitBox <gi...@apache.org>.
sandynz merged PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on pull request #21513: Improve processed records count calculatation at pipeline increment task.

Posted by GitBox <gi...@apache.org>.
azexcy commented on PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#issuecomment-1275470513

   the ci process result
   ```
   dataSourceName: ds_0
   incremental:
     delay:
       lastEventTimestamps: 1665537963000
       latestActiveTimeMillis: 1665537966842
     position: binlog.000001#182842306
   inventory:
     finished:
     - ds_0.t_order_copy#0
   processedRecordsCount: 3015
   sourceDatabaseType: MySQL
   status: EXECUTE_INCREMENTAL_TASK
   ```
   
   now the processedRecordsCount is correct when increment task is runing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org