You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/06/03 12:03:25 UTC

[shardingsphere] branch master updated: Improve CDC XA records output (#26031)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fa60dcf9232 Improve CDC XA records output (#26031)
fa60dcf9232 is described below

commit fa60dcf923213573fc89889ebf2ad9542731e83c
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Jun 3 20:03:15 2023 +0800

    Improve CDC XA records output (#26031)
---
 .../pipeline/cdc/core/importer/CDCImporter.java    | 57 ++++++++++++++++------
 1 file changed, 42 insertions(+), 15 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 909b678994f..550cdd0f55d 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.TimeUnit;
@@ -73,7 +74,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
     
     private final PriorityQueue<CSNRecords> csnRecordsQueue = new PriorityQueue<>(new CSNRecordsComparator());
     
-    private final Cache<String, Pair<CDCChannelProgressPair, CDCAckPosition>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
+    private final Cache<String, List<Pair<CDCChannelProgressPair, CDCAckPosition>>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
     
     @Override
     protected void runBlocking() {
@@ -104,7 +105,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
                 rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
             }
             String ackId = CDCAckId.build(importerId).marshal();
-            ackCache.put(ackId, Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+            ackCache.put(ackId, Collections.singletonList(Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
             sink.write(ackId, records);
             Record lastRecord = records.get(records.size() - 1);
             if (lastRecord instanceof FinishedRecord) {
@@ -122,16 +123,41 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
         if (null != rateLimitAlgorithm) {
             rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
         }
-        prepareTransactionRecords(channelProgressPairs);
-        CSNRecords csnRecords = csnRecordsQueue.poll();
-        if (null == csnRecords) {
+        CSNRecords firstCsnRecords = null;
+        List<CSNRecords> csnRecordsList = new LinkedList<>();
+        for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
+            prepareTransactionRecords(channelProgressPairs);
+            CSNRecords csnRecords = csnRecordsQueue.peek();
+            if (null == csnRecords) {
+                continue;
+            }
+            if (null == firstCsnRecords) {
+                csnRecords = csnRecordsQueue.poll();
+                firstCsnRecords = csnRecords;
+                csnRecordsList.add(csnRecords);
+            } else if (csnRecords.getCsn() == firstCsnRecords.getCsn()) {
+                csnRecords = csnRecordsQueue.poll();
+                csnRecordsList.add(csnRecords);
+            }
+        }
+        if (csnRecordsList.isEmpty()) {
             timeUnit.sleep(timeout);
             return;
         }
         // TODO Combine small transactions into a large transaction, to improve transformation performance.
         String ackId = CDCAckId.build(importerId).marshal();
-        List<Record> records = csnRecords.getRecords();
-        ackCache.put(ackId, Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+        if (1 == csnRecordsList.size()) {
+            CSNRecords csnRecords = csnRecordsList.get(0);
+            List<Record> records = csnRecords.getRecords();
+            ackCache.put(ackId, Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
+            sink.write(ackId, filterDataRecords(records));
+            return;
+        }
+        List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue = csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
+                new CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1), getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
+        ackCache.put(ackId, ackValue);
+        List<Record> records = new ArrayList<>(ackValue.stream().mapToInt(each -> each.getRight().getDataRecordCount()).sum());
+        csnRecordsList.forEach(each -> records.addAll(filterDataRecords(each.getRecords())));
         sink.write(ackId, filterDataRecords(records));
     }
     
@@ -160,8 +186,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
                 csnRecordsQueue.add(new CSNRecords(findFirstDataRecord(records).getCsn(), each, records));
             }
         } else {
-            CSNRecords csnRecords = csnRecordsQueue.peek();
-            long oldestCSN = findFirstDataRecord(csnRecords.getRecords()).getCsn();
+            long oldestCSN = csnRecordsQueue.peek().getCsn();
             for (CDCChannelProgressPair each : channelProgressPairs) {
                 PipelineChannel channel = each.getChannel();
                 List<Record> records = channel.peekRecords();
@@ -174,7 +199,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
                     continue;
                 }
                 long csn = findFirstDataRecord(records).getCsn();
-                if (csn < oldestCSN) {
+                if (csn <= oldestCSN) {
                     records = channel.pollRecords();
                     csnRecordsQueue.add(new CSNRecords(csn, each, records));
                 }
@@ -197,14 +222,16 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
      * @param ackId ack id
      */
     public void ack(final String ackId) {
-        Pair<CDCChannelProgressPair, CDCAckPosition> channelPositionPair = ackCache.getIfPresent(ackId);
-        if (null == channelPositionPair) {
+        List<Pair<CDCChannelProgressPair, CDCAckPosition>> channelPositionPairList = ackCache.getIfPresent(ackId);
+        if (null == channelPositionPairList) {
             log.warn("Could not find cached ack info, ack id: {}", ackId);
             return;
         }
-        CDCAckPosition ackPosition = channelPositionPair.getRight();
-        channelPositionPair.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord()));
-        channelPositionPair.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+        for (Pair<CDCChannelProgressPair, CDCAckPosition> each : channelPositionPairList) {
+            CDCAckPosition ackPosition = each.getRight();
+            each.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord()));
+            each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+        }
     }
     
     @Override