You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/06 09:52:17 UTC

[doris] branch dev-1.0.1 updated: [refactor] if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new e5a15b43b1 [refactor] if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644)
e5a15b43b1 is described below

commit e5a15b43b141c71bbc648e7a912f271c2e90d1ee
Author: minghong <mi...@163.com>
AuthorDate: Wed Jul 6 17:52:10 2022 +0800

    [refactor] if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644)
    
    * if pending bytes exceeded, vtableSink wait until pending bytes consumed or task cancelled
    
    * refactor
---
 be/src/common/config.h           |  4 ----
 be/src/exec/tablet_sink.cpp      |  6 ++----
 be/src/vec/sink/vtablet_sink.cpp | 24 +++++++-----------------
 3 files changed, 9 insertions(+), 25 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3174b445c2..2f956e3b8d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -720,10 +720,6 @@ CONF_Int32(quick_compaction_max_rows, "1000");
 CONF_Int32(quick_compaction_batch_size, "10");
 // do compaction min rowsets
 CONF_Int32(quick_compaction_min_rowsets, "10");
-
-//memory limitation for batches in pending queue, default 500M
-CONF_Int64(table_sink_pending_bytes_limitation, "524288000");
-
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index adb44a5843..2d9733e9bf 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -260,8 +260,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
     // But there is still some unfinished things, we do mem limit here temporarily.
     // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
     // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
-    while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) &&
-           _pending_batches_num > 0) {
+    while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
     }
@@ -310,8 +309,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
     // But there is still some unfinished things, we do mem limit here temporarily.
     // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
     // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
-    while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) &&
-           _pending_batches_num > 0) {
+    while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
     }
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index c90e3b65b7..f0239b23c6 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -58,9 +58,8 @@ Status VOlapTableSink::open(RuntimeState* state) {
 
 size_t VOlapTableSink::get_pending_bytes() const {
     size_t mem_consumption = 0;
-    for (auto& indexChannel : _channels){
+    for (auto& indexChannel : _channels) {
         mem_consumption += indexChannel->get_pending_bytes();
-        
     }
     return mem_consumption;
 }
@@ -116,20 +115,10 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
     if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
         _partition_to_tablet_map.clear();
     }
-    
-    //if pending bytes is more than table_sink_pending_bytes_limitation, wait at most 1 min
-    size_t MAX_PENDING_BYTES = config::table_sink_pending_bytes_limitation;
-    constexpr int max_retry = 120;
-    int retry = 0;
-    while (get_pending_bytes() > MAX_PENDING_BYTES && retry++ < max_retry) {
-        std::this_thread::sleep_for(std::chrono::microseconds(500));
-    }
-    if (get_pending_bytes() > MAX_PENDING_BYTES) {
-        std::stringstream str;
-        str << "Load task " << _load_id
-            << ": pending bytes exceed limit (config::table_sink_pending_bytes_limitation):"
-            << MAX_PENDING_BYTES;
-        return Status::MemoryLimitExceeded(str.str());
+
+    size_t MAX_PENDING_BYTES = _load_mem_limit / 3;
+    while (get_pending_bytes() > MAX_PENDING_BYTES && !state->is_cancelled()) {
+        std::this_thread::sleep_for(std::chrono::microseconds(100));
     }
 
     for (int i = 0; i < num_rows; ++i) {
@@ -140,7 +129,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
         uint32_t tablet_index = 0;
         block_row = {&block, i};
         if (!_vpartition->find_partition(&block_row, &partition)) {
-            RETURN_IF_ERROR(state->append_error_msg_to_file([]() -> std::string { return ""; },
+            RETURN_IF_ERROR(state->append_error_msg_to_file(
+                    []() -> std::string { return ""; },
                     [&]() -> std::string {
                         fmt::memory_buffer buf;
                         fmt::format_to(buf, "no partition for this tuple. tuple=[]");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org