You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/03 03:59:15 UTC

[incubator-doris] branch master updated: [Bug][RoutineLoad] Avoid TOO_MANY_TASKS error (#6342)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c208e9  [Bug][RoutineLoad] Avoid TOO_MANY_TASKS error (#6342)
2c208e9 is described below

commit 2c208e932bd9a69a76b75fea2bd3fe977dc91e98
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Aug 3 11:59:06 2021 +0800

    [Bug][RoutineLoad] Avoid TOO_MANY_TASKS error (#6342)
    
    Use `commitAsync` to commit offset to kafka, instead of using `commitSync`, which may block for a long time.
    Also assign a group.id to routine load if user not specified "property.group.id" property, so that all consumer of
    this job will use same group.id instead of a random id for each consume task.
---
 be/src/runtime/routine_load/data_consumer.cpp      | 22 +++++++---
 .../routine_load/routine_load_task_executor.cpp    |  5 ++-
 .../load/routineload/KafkaRoutineLoadJob.java      |  5 +++
 .../doris/load/routineload/RoutineLoadManager.java | 47 +++++++++++++++++-----
 .../load/routineload/RoutineLoadTaskScheduler.java | 35 +++++-----------
 .../load/routineload/RoutineLoadManagerTest.java   |  4 +-
 6 files changed, 72 insertions(+), 46 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp
index c5c54e4..34cba02 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -33,6 +33,7 @@
 
 namespace doris {
 
+static const std::string PROP_GROUP_ID = "group.id";
 // init kafka consumer will only set common configs such as
 // brokers, groupid
 Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
@@ -47,10 +48,6 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
     // conf has to be deleted finally
     Defer delete_conf{[conf]() { delete conf; }};
 
-    std::stringstream ss;
-    ss << BackendOptions::get_localhost() << "_";
-    std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
-    LOG(INFO) << "init kafka consumer with group id: " << group_id;
 
     std::string errstr;
     auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
@@ -74,7 +71,6 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
     };
 
     RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
-    RETURN_IF_ERROR(set_conf("group.id", group_id));
     RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
     RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
     // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
@@ -108,6 +104,18 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
         _custom_properties.emplace(item.first, item.second);
     }
 
+    // if not specified group id, generate a random one.
+    // ATTN: In the new version, we have set a group.id on the FE side for jobs that have not set a groupid,
+    // but in order to ensure compatibility, we still do a check here.
+    if (_custom_properties.find(PROP_GROUP_ID) == _custom_properties.end()) {
+        std::stringstream ss;
+        ss << BackendOptions::get_localhost() << "_";
+        std::string group_id = ss.str() + UniqueId::gen_uid().to_string();
+        RETURN_IF_ERROR(set_conf(PROP_GROUP_ID, group_id));
+        _custom_properties.emplace(PROP_GROUP_ID, group_id);
+    }
+    LOG(INFO) << "init kafka consumer with group id: " << _custom_properties[PROP_GROUP_ID];
+
     if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
         std::stringstream ss;
         ss << "PAUSE: failed to set 'event_cb'";
@@ -354,7 +362,9 @@ Status KafkaDataConsumer::reset() {
 }
 
 Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
-    RdKafka::ErrorCode err = _k_consumer->commitSync(offset);
+    // Use async commit so that it will not block for a long time.
+    // Commit failure has no effect on Doris, subsequent tasks will continue to commit the new offset
+    RdKafka::ErrorCode err = _k_consumer->commitAsync(offset);
     if (err != RdKafka::ERR_NO_ERROR) {
         std::stringstream ss;
         ss << "failed to commit kafka offset : " << RdKafka::err2str(err);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index f8f9c71..a574309 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -325,9 +325,10 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
             std::for_each(topic_partitions.begin(), topic_partitions.end(),
                           [](RdKafka::TopicPartition* tp1) { delete tp1; });
         }};
-    } break;
+        break;
+    }
     default:
-        return;
+        break;
     }
     cb(ctx);
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index ead99ea..a357055 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -76,6 +76,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class);
 
     public static final String KAFKA_FILE_CATALOG = "kafka";
+    public static final String PROP_GROUP_ID = "group.id";
 
     private String brokerList;
     private String topic;
@@ -446,6 +447,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         if (!stmt.getCustomKafkaProperties().isEmpty()) {
             setCustomKafkaProperties(stmt.getCustomKafkaProperties());
         }
+        // set group id if not specified
+        if (!this.customProperties.containsKey(PROP_GROUP_ID)) {
+            this.customProperties.put(PROP_GROUP_ID, name + "_" + UUID.randomUUID().toString());
+        }
     }
 
     // this is a unprotected method which is called in the initialization function
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 05ebeb3..045fbd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -307,6 +307,7 @@ public class RoutineLoadManager implements Writable {
     // get the BE id with minimum running task on it
     // return -1 if no BE is available.
     // throw exception if unrecoverable errors happen.
+    // ATTN: this is only used for unit test now.
     public long getMinTaskBeId(String clusterName) throws LoadException {
         List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
         if (beIdsInCluster == null) {
@@ -344,30 +345,54 @@ public class RoutineLoadManager implements Writable {
     // check if the specified BE is available for running task
     // return true if it is available. return false if otherwise.
     // throw exception if unrecoverable errors happen.
-    public boolean checkBeToTask(long beId, String clusterName) throws LoadException {
+    public long getAvailableBeForTask(long previoudBeId, String clusterName) throws LoadException {
         List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
         if (beIdsInCluster == null) {
             throw new LoadException("The " + clusterName + " has been deleted");
         }
 
-        if (!beIdsInCluster.contains(beId)) {
-            return false;
+        if (previoudBeId != -1L && !beIdsInCluster.contains(previoudBeId)) {
+            return -1L;
         }
 
         // check if be has idle slot
         readLock();
         try {
-            int idleTaskNum = 0;
             Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
-            if (beIdToConcurrentTasks.containsKey(beId)) {
-                idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
-            } else {
-                idleTaskNum = Config.max_routine_load_task_num_per_be;
+            // 1. Find if the given BE id has available slots
+            if (previoudBeId != -1L) {
+                int idleTaskNum = 0;
+                if (beIdToConcurrentTasks.containsKey(previoudBeId)) {
+                    idleTaskNum = beIdToMaxConcurrentTasks.get(previoudBeId) - beIdToConcurrentTasks.get(previoudBeId);
+                } else {
+                    idleTaskNum = Config.max_routine_load_task_num_per_be;
+                }
+                if (idleTaskNum > 0) {
+                    return previoudBeId;
+                }
             }
-            if (idleTaskNum > 0) {
-                return true;
+
+            // 2. The given BE id does not have available slots, find a BE with min tasks
+            updateBeIdToMaxConcurrentTasks();
+            int idleTaskNum = 0;
+            long resultBeId = -1L;
+            int maxIdleSlotNum = 0;
+            for (Long beId : beIdsInCluster) {
+                if (beIdToMaxConcurrentTasks.containsKey(beId)) {
+                    if (beIdToConcurrentTasks.containsKey(beId)) {
+                        idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
+                    } else {
+                        idleTaskNum = Config.max_routine_load_task_num_per_be;
+                    }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum,
+                                beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId));
+                    }
+                    resultBeId = maxIdleSlotNum < idleTaskNum ? beId : resultBeId;
+                    maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum);
+                }
             }
-            return false;
+            return resultBeId;
         } finally {
             readUnlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index e353c79..44cd71c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -36,13 +36,13 @@ import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -278,34 +278,19 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
     // return true if allocate successfully. return false if failed.
     // throw exception if unrecoverable errors happen.
     private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
-        if (routineLoadTaskInfo.getPreviousBeId() != -1L) {
-            if (routineLoadManager.checkBeToTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName())) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
-                                      .add("job_id", routineLoadTaskInfo.getJobId())
-                                      .add("previous_be_id", routineLoadTaskInfo.getPreviousBeId())
-                                      .add("msg", "task use the previous be id")
-                                      .build());
-                }
-                routineLoadTaskInfo.setBeId(routineLoadTaskInfo.getPreviousBeId());
-                return true;
-            }
-        }
-
-        // the previous BE is not available, try to find a better one
-        long beId = routineLoadManager.getMinTaskBeId(routineLoadTaskInfo.getClusterName());
-        if (beId < 0) {
+        long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
+        if (beId == -1L) {
             return false;
         }
 
-        routineLoadTaskInfo.setBeId(beId);
         if (LOG.isDebugEnabled()) {
             LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
-                              .add("job_id", routineLoadTaskInfo.getJobId())
-                              .add("be_id", routineLoadTaskInfo.getBeId())
-                              .add("msg", "task has been allocated to be")
-                              .build());
+                    .add("job_id", routineLoadTaskInfo.getJobId())
+                    .add("previous_be_id", routineLoadTaskInfo.getPreviousBeId())
+                    .add("assigned_be_id", beId)
+                    .build());
         }
+        routineLoadTaskInfo.setBeId(beId);
         return true;
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index ea460e4..fb8fe9b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.PauseRoutineLoadStmt;
 import org.apache.doris.analysis.ResumeRoutineLoadStmt;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Catalog;
@@ -739,7 +739,7 @@ public class RoutineLoadManagerTest {
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
         Config.max_routine_load_task_num_per_be = 10;
         Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
-        Assert.assertEquals(true, routineLoadManager.checkBeToTask(1L, "default"));
+        Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, "default"));
     }
 
     @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org