You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/12 03:01:03 UTC
[shardingsphere] branch master updated: Improve processed records count calculatation at pipeline increment task. (#21513)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3b72b03d30c Improve processed records count calculatation at pipeline increment task. (#21513)
3b72b03d30c is described below
commit 3b72b03d30cd8919136a3ef3d6f548c3e17baf82
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Oct 12 11:00:55 2022 +0800
Improve processed records count calculatation at pipeline increment task. (#21513)
* Calculate processed records count before merged operation
* Fix codestyle
* Add null check
---
.../data/pipeline/core/importer/DefaultImporter.java | 15 +++++++++------
.../data/pipeline/mysql/ingest/client/MySQLClient.java | 6 +++++-
2 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 699fc6d1878..718bc121d18 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -31,8 +31,8 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -115,17 +115,20 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
}
private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
- List<GroupedDataRecord> result = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+ List<DataRecord> dataRecords = buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList());
int insertRecordNumber = 0;
- int deleteRecordNumber = 0;
+ for (DataRecord each : dataRecords) {
+ if (IngestDataChangeType.INSERT.equals(each.getType())) {
+ insertRecordNumber++;
+ }
+ }
+ List<GroupedDataRecord> result = MERGER.group(dataRecords);
for (GroupedDataRecord each : result) {
- deleteRecordNumber += null != each.getDeleteDataRecords() ? each.getDeleteDataRecords().size() : 0;
flushInternal(dataSource, each.getDeleteDataRecords());
- insertRecordNumber += null != each.getInsertDataRecords() ? each.getInsertDataRecords().size() : 0;
flushInternal(dataSource, each.getInsertDataRecords());
flushInternal(dataSource, each.getUpdateDataRecords());
}
- return new PipelineJobProgressUpdatedParameter(insertRecordNumber - deleteRecordNumber);
+ return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
}
private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 39c2144f44b..d532995c150 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -298,7 +298,7 @@ public final class MySQLClient {
log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
reconnect();
}
-
+
private void reconnect() {
if (reconnectTimes.get() > 3) {
log.warn("exceeds the maximum number of retry times, last binlog event:{}", lastBinlogEvent);
@@ -306,6 +306,10 @@ public final class MySQLClient {
return;
}
int retryTimes = reconnectTimes.incrementAndGet();
+ if (null == lastBinlogEvent || null == lastBinlogEvent.getFileName()) {
+ log.warn("last binlog event is null or the file name is null, last binlog event:{}", lastBinlogEvent);
+ return;
+ }
log.info("reconnect MySQL client, retry times={}", retryTimes);
closeChannel();
connect();