You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/04/25 09:24:47 UTC
(doris) branch branch-2.1 updated: [fix](routine-load) fix routine load lag is negative (#34113)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b873be65888 [fix](routine-load) fix routine load lag is negative (#34113)
b873be65888 is described below
commit b873be65888d6607ec97b7ca9171e799aa6f9b6f
Author: HHoflittlefish777 <77...@users.noreply.github.com>
AuthorDate: Thu Apr 25 17:24:41 2024 +0800
[fix](routine-load) fix routine load lag is negative (#34113)
* [fix](routine-load) fix routine load lag is negative (#33846)
* fix merge error
---
.../org/apache/doris/load/routineload/KafkaProgress.java | 4 ++++
.../doris/load/routineload/KafkaRoutineLoadJob.java | 15 +++++++++++++--
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 49542cd1406..53c57a1cceb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -85,6 +85,10 @@ public class KafkaProgress extends RoutineLoadProgress {
return partitionIdToOffset.get(kafkaPartition);
}
+ public Map<Integer, Long> getOffsetByPartition() {
+ return partitionIdToOffset;
+ }
+
public boolean containsPartition(Integer kafkaPartition) {
return partitionIdToOffset.containsKey(kafkaPartition);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 8540bb43963..c00f16b7d8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -285,16 +285,27 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return false;
}
+ private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) {
+ ((KafkaProgress) attachment.getProgress()).getOffsetByPartition().entrySet().stream()
+ .forEach(entity -> {
+ if (cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
+ && cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
+ cachedPartitionWithLatestOffsets.put(entity.getKey(), entity.getValue() + 1);
+ }
+ });
+ this.progress.update(attachment);
+ }
+
@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
super.updateProgress(attachment);
- this.progress.update(attachment);
+ updateProgressAndOffsetsCache(attachment);
}
@Override
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
super.replayUpdateProgress(attachment);
- this.progress.update(attachment);
+ updateProgressAndOffsetsCache(attachment);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org