You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/06/03 12:03:25 UTC
[shardingsphere] branch master updated: Improve CDC XA records output (#26031)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa60dcf9232 Improve CDC XA records output (#26031)
fa60dcf9232 is described below
commit fa60dcf923213573fc89889ebf2ad9542731e83c
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Jun 3 20:03:15 2023 +0800
Improve CDC XA records output (#26031)
---
.../pipeline/cdc/core/importer/CDCImporter.java | 57 ++++++++++++++++------
1 file changed, 42 insertions(+), 15 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 909b678994f..550cdd0f55d 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
@@ -73,7 +74,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
private final PriorityQueue<CSNRecords> csnRecordsQueue = new PriorityQueue<>(new CSNRecordsComparator());
- private final Cache<String, Pair<CDCChannelProgressPair, CDCAckPosition>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
+ private final Cache<String, List<Pair<CDCChannelProgressPair, CDCAckPosition>>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
@Override
protected void runBlocking() {
@@ -104,7 +105,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
}
String ackId = CDCAckId.build(importerId).marshal();
- ackCache.put(ackId, Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+ ackCache.put(ackId, Collections.singletonList(Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
sink.write(ackId, records);
Record lastRecord = records.get(records.size() - 1);
if (lastRecord instanceof FinishedRecord) {
@@ -122,16 +123,41 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
}
- prepareTransactionRecords(channelProgressPairs);
- CSNRecords csnRecords = csnRecordsQueue.poll();
- if (null == csnRecords) {
+ CSNRecords firstCsnRecords = null;
+ List<CSNRecords> csnRecordsList = new LinkedList<>();
+ for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
+ prepareTransactionRecords(channelProgressPairs);
+ CSNRecords csnRecords = csnRecordsQueue.peek();
+ if (null == csnRecords) {
+ continue;
+ }
+ if (null == firstCsnRecords) {
+ csnRecords = csnRecordsQueue.poll();
+ firstCsnRecords = csnRecords;
+ csnRecordsList.add(csnRecords);
+ } else if (csnRecords.getCsn() == firstCsnRecords.getCsn()) {
+ csnRecords = csnRecordsQueue.poll();
+ csnRecordsList.add(csnRecords);
+ }
+ }
+ if (csnRecordsList.isEmpty()) {
timeUnit.sleep(timeout);
return;
}
// TODO Combine small transactions into a large transaction, to improve transformation performance.
String ackId = CDCAckId.build(importerId).marshal();
- List<Record> records = csnRecords.getRecords();
- ackCache.put(ackId, Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+ if (1 == csnRecordsList.size()) {
+ CSNRecords csnRecords = csnRecordsList.get(0);
+ List<Record> records = csnRecords.getRecords();
+ ackCache.put(ackId, Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
+ sink.write(ackId, filterDataRecords(records));
+ return;
+ }
+ List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue = csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
+ new CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1), getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
+ ackCache.put(ackId, ackValue);
+ List<Record> records = new ArrayList<>(ackValue.stream().mapToInt(each -> each.getRight().getDataRecordCount()).sum());
+ csnRecordsList.forEach(each -> records.addAll(filterDataRecords(each.getRecords())));
sink.write(ackId, filterDataRecords(records));
}
@@ -160,8 +186,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
csnRecordsQueue.add(new CSNRecords(findFirstDataRecord(records).getCsn(), each, records));
}
} else {
- CSNRecords csnRecords = csnRecordsQueue.peek();
- long oldestCSN = findFirstDataRecord(csnRecords.getRecords()).getCsn();
+ long oldestCSN = csnRecordsQueue.peek().getCsn();
for (CDCChannelProgressPair each : channelProgressPairs) {
PipelineChannel channel = each.getChannel();
List<Record> records = channel.peekRecords();
@@ -174,7 +199,7 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
continue;
}
long csn = findFirstDataRecord(records).getCsn();
- if (csn < oldestCSN) {
+ if (csn <= oldestCSN) {
records = channel.pollRecords();
csnRecordsQueue.add(new CSNRecords(csn, each, records));
}
@@ -197,14 +222,16 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
* @param ackId ack id
*/
public void ack(final String ackId) {
- Pair<CDCChannelProgressPair, CDCAckPosition> channelPositionPair = ackCache.getIfPresent(ackId);
- if (null == channelPositionPair) {
+ List<Pair<CDCChannelProgressPair, CDCAckPosition>> channelPositionPairList = ackCache.getIfPresent(ackId);
+ if (null == channelPositionPairList) {
log.warn("Could not find cached ack info, ack id: {}", ackId);
return;
}
- CDCAckPosition ackPosition = channelPositionPair.getRight();
- channelPositionPair.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord()));
- channelPositionPair.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+ for (Pair<CDCChannelProgressPair, CDCAckPosition> each : channelPositionPairList) {
+ CDCAckPosition ackPosition = each.getRight();
+ each.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord()));
+ each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+ }
}
@Override