You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/04/25 09:24:47 UTC

(doris) branch branch-2.1 updated: [fix](routine-load) fix routine load lag is negative (#34113)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b873be65888 [fix](routine-load) fix routine load lag is negative (#34113)
b873be65888 is described below

commit b873be65888d6607ec97b7ca9171e799aa6f9b6f
Author: HHoflittlefish777 <77...@users.noreply.github.com>
AuthorDate: Thu Apr 25 17:24:41 2024 +0800

    [fix](routine-load) fix routine load lag is negative (#34113)
    
    * [fix](routine-load) fix routine load lag is negative (#33846)
    
    * fix merge error
---
 .../org/apache/doris/load/routineload/KafkaProgress.java  |  4 ++++
 .../doris/load/routineload/KafkaRoutineLoadJob.java       | 15 +++++++++++++--
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 49542cd1406..53c57a1cceb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -85,6 +85,10 @@ public class KafkaProgress extends RoutineLoadProgress {
         return partitionIdToOffset.get(kafkaPartition);
     }
 
+    public Map<Integer, Long> getOffsetByPartition() {
+        return partitionIdToOffset;
+    }
+
     public boolean containsPartition(Integer kafkaPartition) {
         return partitionIdToOffset.containsKey(kafkaPartition);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 8540bb43963..c00f16b7d8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -285,16 +285,27 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return false;
     }
 
+    private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) {
+        ((KafkaProgress) attachment.getProgress()).getOffsetByPartition().entrySet().stream()
+                .forEach(entity -> {
+                    if (cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
+                            && cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
+                        cachedPartitionWithLatestOffsets.put(entity.getKey(), entity.getValue() + 1);
+                    }
+                });
+        this.progress.update(attachment);
+    }
+
     @Override
     protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
         super.updateProgress(attachment);
-        this.progress.update(attachment);
+        updateProgressAndOffsetsCache(attachment);
     }
 
     @Override
     protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
         super.replayUpdateProgress(attachment);
-        this.progress.update(attachment);
+        updateProgressAndOffsetsCache(attachment);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org