You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/12 03:01:03 UTC

[shardingsphere] branch master updated: Improve processed records count calculatation at pipeline increment task. (#21513)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b72b03d30c Improve processed records count calculatation at pipeline increment task. (#21513)
3b72b03d30c is described below

commit 3b72b03d30cd8919136a3ef3d6f548c3e17baf82
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Oct 12 11:00:55 2022 +0800

    Improve processed records count calculatation at pipeline increment task. (#21513)
    
    * Calculate processed records count before merged operation
    
    * Fix codestyle
    
    * Add null check
---
 .../data/pipeline/core/importer/DefaultImporter.java      | 15 +++++++++------
 .../data/pipeline/mysql/ingest/client/MySQLClient.java    |  6 +++++-
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 699fc6d1878..718bc121d18 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -31,8 +31,8 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -115,17 +115,20 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
     }
     
     private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
-        List<GroupedDataRecord> result = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+        List<DataRecord> dataRecords = buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList());
         int insertRecordNumber = 0;
-        int deleteRecordNumber = 0;
+        for (DataRecord each : dataRecords) {
+            if (IngestDataChangeType.INSERT.equals(each.getType())) {
+                insertRecordNumber++;
+            }
+        }
+        List<GroupedDataRecord> result = MERGER.group(dataRecords);
         for (GroupedDataRecord each : result) {
-            deleteRecordNumber += null != each.getDeleteDataRecords() ? each.getDeleteDataRecords().size() : 0;
             flushInternal(dataSource, each.getDeleteDataRecords());
-            insertRecordNumber += null != each.getInsertDataRecords() ? each.getInsertDataRecords().size() : 0;
             flushInternal(dataSource, each.getInsertDataRecords());
             flushInternal(dataSource, each.getUpdateDataRecords());
         }
-        return new PipelineJobProgressUpdatedParameter(insertRecordNumber - deleteRecordNumber);
+        return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
     }
     
     private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 39c2144f44b..d532995c150 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -298,7 +298,7 @@ public final class MySQLClient {
             log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
@@ -306,6 +306,10 @@ public final class MySQLClient {
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent || null == lastBinlogEvent.getFileName()) {
+                log.warn("last binlog event is null or the file name is null, last binlog event:{}", lastBinlogEvent);
+                return;
+            }
             log.info("reconnect MySQL client, retry times={}", retryTimes);
             closeChannel();
             connect();