You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/20 07:08:18 UTC

[incubator-doris] 02/03: [fix](routine_load) Add retry mechanism for routine load task which encounter Broker transport failure (#9067)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 5cba77ba856d19c40466159e45638a9db510c43f
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Wed Apr 20 14:49:58 2022 +0800

    [fix](routine_load) Add retry mechanism for routine load task which encounter Broker transport failure (#9067)
---
 be/src/runtime/routine_load/data_consumer.cpp | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp
index 2cf330f0b5..33934bbd1d 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -75,6 +75,8 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
     // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
     RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
     RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));
+    RETURN_IF_ERROR(set_conf("socket.keepalive.enable", "true"));
+    RETURN_IF_ERROR(set_conf("reconnect.backoff.jitter.ms", "100"));
     RETURN_IF_ERROR(set_conf("api.version.request", "true"));
     RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));
     RETURN_IF_ERROR(set_conf("broker.version.fallback", config::kafka_broker_version_fallback));
@@ -173,12 +175,14 @@ Status KafkaDataConsumer::assign_topic_partitions(
 
 Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
                                         int64_t max_running_time_ms) {
+    static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
     int64_t left_time = max_running_time_ms;
     LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
               << ", max running time(ms): " << left_time;
 
     int64_t received_rows = 0;
     int64_t put_rows = 0;
+    int32_t retry_times = 0;
     Status st = Status::OK();
     MonotonicStopWatch consumer_watch;
     MonotonicStopWatch watch;
@@ -220,6 +224,12 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
             // if there is no data in kafka.
             LOG(INFO) << "kafka consume timeout: " << _id;
             break;
+        case RdKafka::ERR__TRANSPORT:
+            LOG(INFO) << "kafka consume Disconnected: " << _id << ", retry times: " << retry_times++;
+            if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(200));
+                break;
+            }
         default:
             LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
             done = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org