You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/02 11:01:32 UTC

[doris] branch dev-1.0.1 updated: [hot-fix] fix a typo error and limit the max wait time in VOlapTableSink::send (#10552)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new 94a883d68e [hot-fix] fix a typo error and limit the max wait time in VOlapTableSink::send (#10552)
94a883d68e is described below

commit 94a883d68eb9deaa0d76960479295fb0fc0e2715
Author: minghong <mi...@163.com>
AuthorDate: Sat Jul 2 19:01:28 2022 +0800

    [hot-fix] fix a typo error and limit the max wait time in VOlapTableSink::send (#10552)
---
 be/src/common/config.h           |  3 +++
 be/src/vec/sink/vtablet_sink.cpp | 19 +++++++++++++------
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index ddee9165b8..3174b445c2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -721,6 +721,9 @@ CONF_Int32(quick_compaction_batch_size, "10");
 // do compaction min rowsets
 CONF_Int32(quick_compaction_min_rowsets, "10");
 
+//memory limitation for batches in pending queue, default 500M
+CONF_Int64(table_sink_pending_bytes_limitation, "524288000");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index e78fc094aa..c90e3b65b7 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -117,12 +117,19 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
         _partition_to_tablet_map.clear();
     }
     
-    //if pending bytes is more than 500M, wait
-    constexpr size_t MAX_PENDING_BYTES = 500 * 1024 * 1024;
-    if ( get_pending_bytes() > MAX_PENDING_BYTES){
-        while(get_pending_bytes() < MAX_PENDING_BYTES){
-            std::this_thread::sleep_for(std::chrono::microseconds(500));
-        }
+    //if pending bytes is more than table_sink_pending_bytes_limitation, wait at most 1 min
+    size_t MAX_PENDING_BYTES = config::table_sink_pending_bytes_limitation;
+    constexpr int max_retry = 120;
+    int retry = 0;
+    while (get_pending_bytes() > MAX_PENDING_BYTES && retry++ < max_retry) {
+        std::this_thread::sleep_for(std::chrono::microseconds(500));
+    }
+    if (get_pending_bytes() > MAX_PENDING_BYTES) {
+        std::stringstream str;
+        str << "Load task " << _load_id
+            << ": pending bytes exceed limit (config::table_sink_pending_bytes_limitation):"
+            << MAX_PENDING_BYTES;
+        return Status::MemoryLimitExceeded(str.str());
     }
 
     for (int i = 0; i < num_rows; ++i) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org