You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2024/04/10 06:53:56 UTC

(doris) branch master updated: [improve](routine-load) timely pause job if Kafka cluster exception when consume (#33372)

This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0991829916d [improve](routine-load) timely pause job if Kafka cluster exception when consume (#33372)
0991829916d is described below

commit 0991829916d9fda42fe6c7684df860d827bdaf56
Author: HHoflittlefish777 <77...@users.noreply.github.com>
AuthorDate: Wed Apr 10 14:53:51 2024 +0800

    [improve](routine-load) timely pause job if Kafka cluster exception when consume (#33372)
---
 .../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index d2b9410568b..483754240f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -780,7 +780,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
             }
         } catch (Exception e) {
-            LOG.warn("failed to get latest partition offset. {}", e.getMessage(), e);
+            // It needs to pause job when can not get partition meta.
+            // To ensure the stability of the routine load,
+            // the scheduler will automatically pull up routine load job in this scenario,
+            // to avoid some network and Kafka exceptions causing the routine load job to stop
+            updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
+                        "failed to get latest partition offset. {}" + e.getMessage()),
+                        false /* not replay */);
             return false;
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org