You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/22 13:59:42 UTC

[doris] branch master updated: [bugfix and improvement]fix mem tracker for load and simplify some macros (#11125)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ad31b6c902 [bugfix and improvement]fix mem tracker for load and simplify some macros (#11125)
ad31b6c902 is described below

commit ad31b6c902a3704bda5f42e4cec80ab085e094c0
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Jul 22 21:59:36 2022 +0800

    [bugfix and improvement]fix mem tracker for load and simplify some macros (#11125)
---
 be/src/exec/analytic_eval_node.cpp              | 3 +--
 be/src/exec/es/es_scroll_parser.cpp             | 8 ++------
 be/src/exec/partitioned_aggregation_node.cc     | 6 ++----
 be/src/exec/partitioned_hash_table.cc           | 3 +--
 be/src/exprs/anyval_util.cpp                    | 3 +--
 be/src/exprs/expr_context.cpp                   | 5 ++---
 be/src/runtime/load_channel.cpp                 | 4 ++--
 be/src/runtime/load_channel.h                   | 2 +-
 be/src/runtime/memory/mem_tracker.h             | 3 ++-
 be/src/runtime/memory/mem_tracker_limiter.h     | 9 ++++++---
 be/src/runtime/memory/mem_tracker_task_pool.cpp | 6 +++---
 be/src/runtime/runtime_state.cpp                | 2 +-
 regression-test/conf/regression-conf.groovy     | 4 ++--
 13 files changed, 26 insertions(+), 32 deletions(-)

diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp
index 08d2dc2c35..f0e7f01989 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -202,8 +202,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
                 "Failed to acquire initial read buffer for analytic function "
                 "evaluation. Reducing query concurrency or increasing the memory limit may "
                 "help this query to complete successfully.");
-        RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
-                              state, msg);
+        RETURN_LIMIT_EXCEEDED(state, msg);
     }
 
     DCHECK_EQ(_evaluators.size(), _fn_ctxs.size());
diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp
index 8dd3a2c9e1..a95af7bb51 100644
--- a/be/src/exec/es/es_scroll_parser.cpp
+++ b/be/src/exec/es/es_scroll_parser.cpp
@@ -353,9 +353,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple,
             if (UNLIKELY(buffer == nullptr)) {
                 std::string details = strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED,
                                                           "MaterializeNextRow", len, "string slot");
-                RETURN_LIMIT_EXCEEDED(
-                        thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), nullptr,
-                        details, len, rst);
+                RETURN_LIMIT_EXCEEDED(nullptr, details, len, rst);
             }
             memcpy(buffer, _id.data(), len);
             reinterpret_cast<StringValue*>(slot)->ptr = buffer;
@@ -415,9 +413,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple,
             if (UNLIKELY(buffer == nullptr)) {
                 std::string details = strings::Substitute(
                         ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot");
-                RETURN_LIMIT_EXCEEDED(
-                        thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), nullptr,
-                        details, val_size, rst);
+                RETURN_LIMIT_EXCEEDED(nullptr, details, val_size, rst);
             }
             memcpy(buffer, val.data(), val_size);
             reinterpret_cast<StringValue*>(slot)->ptr = buffer;
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc
index 2811375395..ad5f6788d9 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -411,8 +411,7 @@ Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_des
                     "Cannot perform aggregation at node with id $0."
                     " Failed to allocate $1 output bytes.",
                     _id, sv->len);
-            RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
-                                  state_, details, sv->len, rst);
+            RETURN_LIMIT_EXCEEDED(state_, details, sv->len, rst);
         }
         memcpy(new_ptr, sv->ptr, sv->len);
         sv->ptr = new_ptr;
@@ -851,8 +850,7 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
     // TODO(ml): enable spill
     std::stringstream msg;
     msg << "New partitioned Aggregation in spill";
-    RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
-                          parent->state_, msg.str());
+    RETURN_LIMIT_EXCEEDED(parent->state_, msg.str());
 
     RETURN_IF_ERROR(SerializeStreamForSpilling());
 
diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc
index 4f7904cb56..cbcc85070c 100644
--- a/be/src/exec/partitioned_hash_table.cc
+++ b/be/src/exec/partitioned_hash_table.cc
@@ -313,8 +313,7 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
         capacity_ = 0;
         string details = Substitute(
                 "PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes", mem_usage);
-        RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
-                              state, details, mem_usage, st);
+        RETURN_LIMIT_EXCEEDED(state, details, mem_usage, st);
     }
 
     int expr_values_size = expr_values_bytes_per_row_ * capacity_;
diff --git a/be/src/exprs/anyval_util.cpp b/be/src/exprs/anyval_util.cpp
index 754f1f4dbd..a53031fc12 100644
--- a/be/src/exprs/anyval_util.cpp
+++ b/be/src/exprs/anyval_util.cpp
@@ -47,8 +47,7 @@ Status allocate_any_val(RuntimeState* state, MemPool* pool, const TypeDescriptor
     *result = reinterpret_cast<AnyVal*>(
             pool->try_allocate_aligned(anyval_size, anyval_alignment, &rst));
     if (*result == nullptr) {
-        RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
-                              state, mem_limit_exceeded_msg, anyval_size, rst);
+        RETURN_LIMIT_EXCEEDED(state, mem_limit_exceeded_msg, anyval_size, rst);
     }
     memset(static_cast<void*>(*result), 0, anyval_size);
     return Status::OK();
diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp
index 7076f1372f..8cd8fe7c1c 100644
--- a/be/src/exprs/expr_context.cpp
+++ b/be/src/exprs/expr_context.cpp
@@ -401,9 +401,8 @@ Status ExprContext::get_const_value(RuntimeState* state, Expr& expr, AnyVal** co
             Status rst;
             char* ptr_copy = reinterpret_cast<char*>(_pool->try_allocate(sv->len, &rst));
             if (ptr_copy == nullptr) {
-                RETURN_LIMIT_EXCEEDED(
-                        thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), state,
-                        "Could not allocate constant string value", sv->len, rst);
+                RETURN_LIMIT_EXCEEDED(state, "Could not allocate constant string value", sv->len,
+                                      rst);
             }
             memcpy(ptr_copy, sv->ptr, sv->len);
             sv->ptr = reinterpret_cast<uint8_t*>(ptr_copy);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 727206e4f0..9a9d1c808f 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -50,7 +50,7 @@ LoadChannel::~LoadChannel() {
 }
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     int64_t index_id = params.index_id();
     std::shared_ptr<TabletsChannel> channel;
     {
@@ -137,7 +137,7 @@ bool LoadChannel::is_finished() {
 
 Status LoadChannel::cancel() {
     std::lock_guard<std::mutex> l(_lock);
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     for (auto& it : _tablets_channels) {
         it.second->cancel();
     }
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 299272ae96..4137c7fafc 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -129,7 +129,7 @@ private:
 template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
 Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
                               TabletWriterAddResult* response) {
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     int64_t index_id = request.index_id();
     // 1. get tablets channel
     std::shared_ptr<TabletsChannel> channel;
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 9f00c23499..4e4af1723d 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -73,8 +73,9 @@ public:
 
 public:
     bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); }
+    // Return true, no exceeded limit
     bool check_limit(int64_t limit, int64_t bytes) const {
-        return limit >= 0 && limit < consumption() + bytes;
+        return limit >= 0 && limit > consumption() + bytes;
     }
 
     // Usually, a negative values means that the statistics are not accurate,
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 7dcfd80dc7..1f3fb89800 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -270,8 +270,11 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
     return Status::OK();
 }
 
-#define RETURN_LIMIT_EXCEEDED(tracker, ...) return tracker->mem_limit_exceeded(__VA_ARGS__);
-#define RETURN_IF_LIMIT_EXCEEDED(tracker, state, msg) \
-    if (tracker->any_limit_exceeded()) RETURN_LIMIT_EXCEEDED(tracker, state, msg);
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...)                                                   \
+    return thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->mem_limit_exceeded( \
+            state, msg, ##__VA_ARGS__);
+#define RETURN_IF_LIMIT_EXCEEDED(state, msg)                                                    \
+    if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded()) \
+        RETURN_LIMIT_EXCEEDED(state, msg);
 
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 8947e019bd..86f2976f18 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -42,7 +42,7 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std:
                 ctor(task_id, new MemTrackerLimiter(mem_limit, label, parent));
             });
     if (new_emplace) {
-        LOG(INFO) << "Register task memory tracker, task id: " << task_id
+        LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id
                   << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
     }
     return _task_mem_trackers[task_id];
@@ -109,11 +109,11 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
     for (auto tid : expired_tasks) {
         if (!_task_mem_trackers[tid]) {
             _task_mem_trackers.erase(tid);
-            LOG(INFO) << "Deregister null task memory tracker, task id: " << tid;
+            LOG(INFO) << "Deregister null query/load memory tracker, query/load id: " << tid;
         } else {
             delete _task_mem_trackers[tid];
             _task_mem_trackers.erase(tid);
-            LOG(INFO) << "Deregister not used task memory tracker, task id: " << tid;
+            LOG(INFO) << "Deregister not used query/load memory tracker, query/load id: " << tid;
         }
     }
 }
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 054390d990..02660730c3 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -352,7 +352,7 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
 Status RuntimeState::check_query_state(const std::string& msg) {
     // TODO: it would be nice if this also checked for cancellation, but doing so breaks
     // cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached.
-    RETURN_IF_LIMIT_EXCEEDED(_instance_mem_tracker, this, msg);
+    RETURN_IF_LIMIT_EXCEEDED(this, msg);
     return query_status();
 }
 
diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy
index a4ebc7ea02..6f419aea16 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -20,11 +20,11 @@
 // **Note**: default db will be create if not exist
 defaultDb = "regression_test"
 
-jdbcUrl = "jdbc:mysql://127.0.0.1:9083/?"
+jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?"
 jdbcUser = "root"
 jdbcPassword = ""
 
-feHttpAddress = "127.0.0.1:8033"
+feHttpAddress = "127.0.0.1:8030"
 feHttpUser = "root"
 feHttpPassword = ""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org