You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/22 13:59:42 UTC
[doris] branch master updated: [bugfix and improvement]fix mem tracker for load and simplify some macros (#11125)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ad31b6c902 [bugfix and improvement]fix mem tracker for load and simplify some macros (#11125)
ad31b6c902 is described below
commit ad31b6c902a3704bda5f42e4cec80ab085e094c0
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Jul 22 21:59:36 2022 +0800
[bugfix and improvement]fix mem tracker for load and simplify some macros (#11125)
---
be/src/exec/analytic_eval_node.cpp | 3 +--
be/src/exec/es/es_scroll_parser.cpp | 8 ++------
be/src/exec/partitioned_aggregation_node.cc | 6 ++----
be/src/exec/partitioned_hash_table.cc | 3 +--
be/src/exprs/anyval_util.cpp | 3 +--
be/src/exprs/expr_context.cpp | 5 ++---
be/src/runtime/load_channel.cpp | 4 ++--
be/src/runtime/load_channel.h | 2 +-
be/src/runtime/memory/mem_tracker.h | 3 ++-
be/src/runtime/memory/mem_tracker_limiter.h | 9 ++++++---
be/src/runtime/memory/mem_tracker_task_pool.cpp | 6 +++---
be/src/runtime/runtime_state.cpp | 2 +-
regression-test/conf/regression-conf.groovy | 4 ++--
13 files changed, 26 insertions(+), 32 deletions(-)
diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp
index 08d2dc2c35..f0e7f01989 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -202,8 +202,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
"Failed to acquire initial read buffer for analytic function "
"evaluation. Reducing query concurrency or increasing the memory limit may "
"help this query to complete successfully.");
- RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
- state, msg);
+ RETURN_LIMIT_EXCEEDED(state, msg);
}
DCHECK_EQ(_evaluators.size(), _fn_ctxs.size());
diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp
index 8dd3a2c9e1..a95af7bb51 100644
--- a/be/src/exec/es/es_scroll_parser.cpp
+++ b/be/src/exec/es/es_scroll_parser.cpp
@@ -353,9 +353,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple,
if (UNLIKELY(buffer == nullptr)) {
std::string details = strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED,
"MaterializeNextRow", len, "string slot");
- RETURN_LIMIT_EXCEEDED(
- thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), nullptr,
- details, len, rst);
+ RETURN_LIMIT_EXCEEDED(nullptr, details, len, rst);
}
memcpy(buffer, _id.data(), len);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
@@ -415,9 +413,7 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, Tuple* tuple,
if (UNLIKELY(buffer == nullptr)) {
std::string details = strings::Substitute(
ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot");
- RETURN_LIMIT_EXCEEDED(
- thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), nullptr,
- details, val_size, rst);
+ RETURN_LIMIT_EXCEEDED(nullptr, details, val_size, rst);
}
memcpy(buffer, val.data(), val_size);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc
index 2811375395..ad5f6788d9 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -411,8 +411,7 @@ Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_des
"Cannot perform aggregation at node with id $0."
" Failed to allocate $1 output bytes.",
_id, sv->len);
- RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
- state_, details, sv->len, rst);
+ RETURN_LIMIT_EXCEEDED(state_, details, sv->len, rst);
}
memcpy(new_ptr, sv->ptr, sv->len);
sv->ptr = new_ptr;
@@ -851,8 +850,7 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
// TODO(ml): enable spill
std::stringstream msg;
msg << "New partitioned Aggregation in spill";
- RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
- parent->state_, msg.str());
+ RETURN_LIMIT_EXCEEDED(parent->state_, msg.str());
RETURN_IF_ERROR(SerializeStreamForSpilling());
diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc
index 4f7904cb56..cbcc85070c 100644
--- a/be/src/exec/partitioned_hash_table.cc
+++ b/be/src/exec/partitioned_hash_table.cc
@@ -313,8 +313,7 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
capacity_ = 0;
string details = Substitute(
"PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes", mem_usage);
- RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
- state, details, mem_usage, st);
+ RETURN_LIMIT_EXCEEDED(state, details, mem_usage, st);
}
int expr_values_size = expr_values_bytes_per_row_ * capacity_;
diff --git a/be/src/exprs/anyval_util.cpp b/be/src/exprs/anyval_util.cpp
index 754f1f4dbd..a53031fc12 100644
--- a/be/src/exprs/anyval_util.cpp
+++ b/be/src/exprs/anyval_util.cpp
@@ -47,8 +47,7 @@ Status allocate_any_val(RuntimeState* state, MemPool* pool, const TypeDescriptor
*result = reinterpret_cast<AnyVal*>(
pool->try_allocate_aligned(anyval_size, anyval_alignment, &rst));
if (*result == nullptr) {
- RETURN_LIMIT_EXCEEDED(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(),
- state, mem_limit_exceeded_msg, anyval_size, rst);
+ RETURN_LIMIT_EXCEEDED(state, mem_limit_exceeded_msg, anyval_size, rst);
}
memset(static_cast<void*>(*result), 0, anyval_size);
return Status::OK();
diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp
index 7076f1372f..8cd8fe7c1c 100644
--- a/be/src/exprs/expr_context.cpp
+++ b/be/src/exprs/expr_context.cpp
@@ -401,9 +401,8 @@ Status ExprContext::get_const_value(RuntimeState* state, Expr& expr, AnyVal** co
Status rst;
char* ptr_copy = reinterpret_cast<char*>(_pool->try_allocate(sv->len, &rst));
if (ptr_copy == nullptr) {
- RETURN_LIMIT_EXCEEDED(
- thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(), state,
- "Could not allocate constant string value", sv->len, rst);
+ RETURN_LIMIT_EXCEEDED(state, "Could not allocate constant string value", sv->len,
+ rst);
}
memcpy(ptr_copy, sv->ptr, sv->len);
sv->ptr = reinterpret_cast<uint8_t*>(ptr_copy);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 727206e4f0..9a9d1c808f 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -50,7 +50,7 @@ LoadChannel::~LoadChannel() {
}
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
int64_t index_id = params.index_id();
std::shared_ptr<TabletsChannel> channel;
{
@@ -137,7 +137,7 @@ bool LoadChannel::is_finished() {
Status LoadChannel::cancel() {
std::lock_guard<std::mutex> l(_lock);
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
for (auto& it : _tablets_channels) {
it.second->cancel();
}
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 299272ae96..4137c7fafc 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -129,7 +129,7 @@ private:
template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
TabletWriterAddResult* response) {
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr<TabletsChannel> channel;
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 9f00c23499..4e4af1723d 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -73,8 +73,9 @@ public:
public:
bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); }
+ // Return true, no exceeded limit
bool check_limit(int64_t limit, int64_t bytes) const {
- return limit >= 0 && limit < consumption() + bytes;
+ return limit >= 0 && limit > consumption() + bytes;
}
// Usually, a negative values means that the statistics are not accurate,
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 7dcfd80dc7..1f3fb89800 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -270,8 +270,11 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
return Status::OK();
}
-#define RETURN_LIMIT_EXCEEDED(tracker, ...) return tracker->mem_limit_exceeded(__VA_ARGS__);
-#define RETURN_IF_LIMIT_EXCEEDED(tracker, state, msg) \
- if (tracker->any_limit_exceeded()) RETURN_LIMIT_EXCEEDED(tracker, state, msg);
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
+ return thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->mem_limit_exceeded( \
+ state, msg, ##__VA_ARGS__);
+#define RETURN_IF_LIMIT_EXCEEDED(state, msg) \
+ if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded()) \
+ RETURN_LIMIT_EXCEEDED(state, msg);
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 8947e019bd..86f2976f18 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -42,7 +42,7 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std:
ctor(task_id, new MemTrackerLimiter(mem_limit, label, parent));
});
if (new_emplace) {
- LOG(INFO) << "Register task memory tracker, task id: " << task_id
+ LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id
<< " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
}
return _task_mem_trackers[task_id];
@@ -109,11 +109,11 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
for (auto tid : expired_tasks) {
if (!_task_mem_trackers[tid]) {
_task_mem_trackers.erase(tid);
- LOG(INFO) << "Deregister null task memory tracker, task id: " << tid;
+ LOG(INFO) << "Deregister null query/load memory tracker, query/load id: " << tid;
} else {
delete _task_mem_trackers[tid];
_task_mem_trackers.erase(tid);
- LOG(INFO) << "Deregister not used task memory tracker, task id: " << tid;
+ LOG(INFO) << "Deregister not used query/load memory tracker, query/load id: " << tid;
}
}
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 054390d990..02660730c3 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -352,7 +352,7 @@ Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
Status RuntimeState::check_query_state(const std::string& msg) {
// TODO: it would be nice if this also checked for cancellation, but doing so breaks
// cases where we use Status::Cancelled("Cancelled") to indicate that the limit was reached.
- RETURN_IF_LIMIT_EXCEEDED(_instance_mem_tracker, this, msg);
+ RETURN_IF_LIMIT_EXCEEDED(this, msg);
return query_status();
}
diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy
index a4ebc7ea02..6f419aea16 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -20,11 +20,11 @@
// **Note**: default db will be create if not exist
defaultDb = "regression_test"
-jdbcUrl = "jdbc:mysql://127.0.0.1:9083/?"
+jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?"
jdbcUser = "root"
jdbcPassword = ""
-feHttpAddress = "127.0.0.1:8033"
+feHttpAddress = "127.0.0.1:8030"
feHttpUser = "root"
feHttpPassword = ""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org