You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/27 10:59:30 UTC
[doris] branch master updated: [fix] (mem tracker) Fix MemTracker accuracy (#11190)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b6bdb3bdbc [fix] (mem tracker) Fix MemTracker accuracy (#11190)
b6bdb3bdbc is described below
commit b6bdb3bdbcc81d6751b21044a09485a3be7bf07e
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Jul 27 18:59:24 2022 +0800
[fix] (mem tracker) Fix MemTracker accuracy (#11190)
---
be/src/common/config.h | 2 +-
be/src/exec/broker_scan_node.cpp | 2 +
be/src/exec/es_http_scan_node.cpp | 1 +
be/src/exec/tablet_sink.cpp | 1 -
be/src/http/ev_http_server.cpp | 2 +
be/src/olap/compaction.cpp | 8 +++-
be/src/olap/delta_writer.cpp | 39 ++++++++++++-----
be/src/olap/delta_writer.h | 11 +++--
be/src/olap/memtable.cpp | 5 +--
be/src/olap/memtable.h | 4 +-
be/src/olap/memtable_flush_executor.cpp | 52 ++++++++++++----------
be/src/olap/memtable_flush_executor.h | 7 ++-
be/src/olap/olap_server.cpp | 10 ++++-
be/src/olap/storage_engine.cpp | 7 ++-
be/src/olap/storage_engine.h | 2 +
be/src/runtime/disk_io_mgr.cc | 9 ++--
be/src/runtime/disk_io_mgr.h | 2 +-
be/src/runtime/exec_env_init.cpp | 1 +
be/src/runtime/fragment_mgr.cpp | 5 +--
be/src/runtime/load_channel.cpp | 4 +-
be/src/runtime/load_channel.h | 1 -
be/src/runtime/memory/mem_tracker.cpp | 8 ++--
be/src/runtime/memory/mem_tracker.h | 6 +--
be/src/runtime/memory/mem_tracker_limiter.h | 8 +++-
be/src/runtime/memory/mem_tracker_task_pool.cpp | 56 +++++++++---------------
be/src/runtime/memory/mem_tracker_task_pool.h | 7 +--
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 8 ++++
be/src/runtime/memory/thread_mem_tracker_mgr.h | 18 ++++++++
be/src/runtime/tablets_channel.cpp | 15 +++----
be/src/runtime/tablets_channel.h | 7 ++-
be/src/runtime/thread_context.cpp | 19 --------
be/src/runtime/thread_context.h | 1 -
be/src/service/doris_main.cpp | 4 +-
be/src/vec/exec/vbroker_scan_node.cpp | 3 ++
be/src/vec/exec/ves_http_scan_node.cpp | 1 +
be/src/vec/sink/vtablet_sink.cpp | 1 +
be/test/olap/delta_writer_test.cpp | 6 +--
37 files changed, 192 insertions(+), 151 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9891df2f18..d1f43eaeba 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -653,7 +653,7 @@ CONF_Bool(memory_verbose_track, "false");
// smaller than this value will continue to accumulate. specified as number of bytes.
// Decreasing this value will increase the frequency of consume/release.
// Increasing this value will cause MemTracker statistics to be inaccurate.
-CONF_mInt32(mem_tracker_consume_min_size_bytes, "4194304");
+CONF_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
// The version information of the tablet will be stored in the memory
// in an adjacency graph data structure.
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 9bc920a9b6..0b5fe1d940 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -373,6 +373,8 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
}
void BrokerScanNode::scanner_worker(int start_idx, int length) {
+ SCOPED_ATTACH_TASK(_runtime_state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs);
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index acfc14bb7d..ec23253a5b 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -419,6 +419,7 @@ static std::string get_host_port(const std::vector<TNetworkAddress>& es_hosts) {
void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
SCOPED_ATTACH_TASK(_runtime_state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
DCHECK(start_idx < length);
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 6254d906bd..ef3d97c2bf 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -600,7 +600,6 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
int64_t tablet_id) {
- SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
const auto& it = _tablets_by_channel.find(node_id);
if (it == _tablets_by_channel.end()) {
return;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index 2d9031383a..bf612ca830 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -33,6 +33,7 @@
#include "http/http_handler.h"
#include "http/http_headers.h"
#include "http/http_request.h"
+#include "runtime/thread_context.h"
#include "service/brpc.h"
#include "util/debug_util.h"
#include "util/threadpool.h"
@@ -98,6 +99,7 @@ void EvHttpServer::start() {
_event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
CHECK(_workers->submit_func([this, i]() {
+ thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) {
event_base_free(base);
});
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index aa6ff0f6d3..a456370f85 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -41,7 +41,13 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
#endif
}
-Compaction::~Compaction() {}
+Compaction::~Compaction() {
+#ifndef BE_TEST
+ // Compaction tracker cannot be completely accurate, offset the global impact.
+ StorageEngine::instance()->compaction_mem_tracker()->consumption_revise(
+ -_mem_tracker->consumption());
+#endif
+}
Status Compaction::compact() {
RETURN_NOT_OK(prepare_compact());
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 73e9d7b992..04b7327f25 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -30,12 +30,14 @@
namespace doris {
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, bool is_vec) {
- *writer = new DeltaWriter(req, StorageEngine::instance(), is_vec);
+Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, MemTrackerLimiter* parent_tracker,
+ bool is_vec) {
+ *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, is_vec);
return Status::OK();
}
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec)
+DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
+ MemTrackerLimiter* parent_tracker, bool is_vec)
: _req(*req),
_tablet(nullptr),
_cur_rowset(nullptr),
@@ -43,6 +45,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool
_tablet_schema(new TabletSchema),
_delta_written_success(false),
_storage_engine(storage_engine),
+ _parent_tracker(parent_tracker),
_is_vec(is_vec) {}
DeltaWriter::~DeltaWriter() {
@@ -95,9 +98,9 @@ Status DeltaWriter::init() {
<< ", schema_hash=" << _req.schema_hash;
return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
}
-
- _flushed_mem_tracker = std::make_unique<MemTracker>(
- fmt::format("DeltaWriter:tabletId={}", std::to_string(_tablet->tablet_id())));
+ _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), _parent_tracker);
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
//trigger quick compaction
@@ -147,7 +150,10 @@ Status DeltaWriter::write(Tuple* tuple) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
+ int64_t prev_memtable_usage = _mem_table->memory_usage();
_mem_table->insert(tuple);
+ THREAD_MEM_TRACKER_TRANSFER_TO(_mem_table->memory_usage() - prev_memtable_usage,
+ _mem_tracker.get());
// if memtable is full, push it to the flush executor,
// and create a new memtable for incoming data
@@ -171,9 +177,15 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
+
+ // Hook automatically records that the memory is lower than the real value, so manual tracking is used.
+ // Because multiple places freed memory that doesn't belong to DeltaWriter
+ int64_t prev_memtable_usage = _mem_table->memory_usage();
for (const auto& row_idx : row_idxs) {
_mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
}
+ THREAD_MEM_TRACKER_TRANSFER_TO(_mem_table->memory_usage() - prev_memtable_usage,
+ _mem_tracker.get());
if (_mem_table->memory_usage() >= config::write_buffer_size) {
RETURN_NOT_OK(_flush_memtable_async());
@@ -196,6 +208,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
_mem_table->insert(block, row_idxs);
if (_mem_table->need_to_agg()) {
@@ -213,8 +226,7 @@ Status DeltaWriter::_flush_memtable_async() {
if (++_segment_counter > config::max_segment_num_per_rowset) {
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
}
- _flushed_mem_tracker->consume(_mem_table->memory_usage());
- return _flush_token->submit(_mem_table);
+ return _flush_token->submit(std::move(_mem_table), _mem_tracker.get());
}
Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
@@ -231,6 +243,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
if (mem_consumption() == _mem_table->memory_usage()) {
// equal means there is no memtable in flush queue, just flush this memtable
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
@@ -267,7 +280,7 @@ Status DeltaWriter::wait_flush() {
void DeltaWriter::_reset_mem_table() {
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema.get(),
_req.slots, _req.tuple_desc, _tablet->keys_type(),
- _rowset_writer.get(), _flushed_mem_tracker.get(), _is_vec));
+ _rowset_writer.get(), _is_vec));
}
Status DeltaWriter::close() {
@@ -285,6 +298,7 @@ Status DeltaWriter::close() {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
RETURN_NOT_OK(_flush_memtable_async());
_mem_table.reset();
return Status::OK();
@@ -299,6 +313,7 @@ Status DeltaWriter::close_wait() {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());
@@ -350,12 +365,12 @@ int64_t DeltaWriter::get_mem_consumption_snapshot() const {
}
int64_t DeltaWriter::mem_consumption() const {
- if (_flushed_mem_tracker == nullptr) {
+ if (_mem_tracker == nullptr) {
// This method may be called before this writer is initialized.
- // So _flushed_mem_tracker may be null.
+ // So _mem_tracker may be null.
return 0;
}
- return _flushed_mem_tracker->consumption() + _mem_table->memory_usage();
+ return _mem_tracker->consumption();
}
int64_t DeltaWriter::partition_id() const {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 0c11b8fcbe..1ce62de338 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -55,7 +55,8 @@ struct WriteRequest {
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriter {
public:
- static Status open(WriteRequest* req, DeltaWriter** writer, bool is_vec = false);
+ static Status open(WriteRequest* req, DeltaWriter** writer,
+ MemTrackerLimiter* parent_tracker = nullptr, bool is_vec = false);
~DeltaWriter();
@@ -100,7 +101,8 @@ public:
int64_t get_mem_consumption_snapshot() const;
private:
- DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec);
+ DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, MemTrackerLimiter* parent_tracker,
+ bool is_vec);
// push a full memtable to flush executor
Status _flush_memtable_async();
@@ -120,7 +122,7 @@ private:
RowsetSharedPtr _cur_rowset;
std::unique_ptr<RowsetWriter> _rowset_writer;
// TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
- std::shared_ptr<MemTable> _mem_table;
+ std::unique_ptr<MemTable> _mem_table;
std::unique_ptr<Schema> _schema;
//const TabletSchema* _tablet_schema;
// tablet schema owned by delta writer, all write will use this tablet schema
@@ -131,7 +133,8 @@ private:
StorageEngine* _storage_engine;
std::unique_ptr<FlushToken> _flush_token;
- std::unique_ptr<MemTracker> _flushed_mem_tracker;
+ std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ MemTrackerLimiter* _parent_tracker;
// The counter of number of segment flushed already.
int64_t _segment_counter = 0;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index a44c54c552..46607f6a6c 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -31,8 +31,7 @@ namespace doris {
MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
- KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* writer_mem_tracker,
- bool support_vec)
+ KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec)
: _tablet_id(tablet_id),
_schema(schema),
_tablet_schema(tablet_schema),
@@ -40,7 +39,6 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
_keys_type(keys_type),
_mem_tracker(std::make_unique<MemTracker>(
fmt::format("MemTable:tabletId={}", std::to_string(tablet_id)))),
- _writer_mem_tracker(writer_mem_tracker),
_buffer_mem_pool(new MemPool(_mem_tracker.get())),
_table_mem_pool(new MemPool(_mem_tracker.get())),
_schema_size(_schema->schema_size()),
@@ -134,7 +132,6 @@ MemTable::~MemTable() {
}
}
std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
- _writer_mem_tracker->release(_mem_tracker->consumption());
_mem_tracker->release(_mem_usage);
_buffer_mem_pool->free_all();
_table_mem_pool->free_all();
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index d92e7773a9..9be1f7d33d 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -42,8 +42,7 @@ class MemTable {
public:
MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
- KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* writer_mem_tracker,
- bool support_vec = false);
+ KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec = false);
~MemTable();
int64_t tablet_id() const { return _tablet_id; }
@@ -153,7 +152,6 @@ private:
std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
std::unique_ptr<MemTracker> _mem_tracker;
- MemTracker* _writer_mem_tracker;
// This is a buffer, to hold the memory referenced by the rows that have not
// been inserted into the SkipList
std::unique_ptr<MemPool> _buffer_mem_pool;
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index 3e722019e4..b9e622a9a7 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -26,6 +26,30 @@
namespace doris {
+class MemtableFlushTask final : public Runnable {
+public:
+ MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable,
+ int64_t submit_task_time, MemTrackerLimiter* tracker)
+ : _flush_token(flush_token),
+ _memtable(std::move(memtable)),
+ _submit_task_time(submit_task_time),
+ _tracker(tracker) {}
+
+ ~MemtableFlushTask() override = default;
+
+ void run() override {
+ SCOPED_ATTACH_TASK(_tracker, ThreadContext::TaskType::LOAD);
+ _flush_token->_flush_memtable(_memtable.get(), _submit_task_time);
+ _memtable.reset();
+ }
+
+private:
+ FlushToken* _flush_token;
+ std::unique_ptr<MemTable> _memtable;
+ int64_t _submit_task_time;
+ MemTrackerLimiter* _tracker;
+};
+
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
<< ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
@@ -34,20 +58,15 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
return os;
}
-// The type of parameter is safe to be a reference. Because the function object
-// returned by std::bind() will increase the reference count of Memtable. i.e.,
-// after the submit() method returns, even if the caller immediately releases the
-// passed shared_ptr object, the Memtable object will not be destructed because
-// its reference count is not 0.
-Status FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
+Status FlushToken::submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker) {
ErrorCode s = _flush_status.load();
if (s != OLAP_SUCCESS) {
return Status::OLAPInternalError(s);
}
int64_t submit_task_time = MonotonicNanos();
- _flush_token->submit_func(
- std::bind(&FlushToken::_flush_memtable, this, memtable, submit_task_time));
- return Status::OK();
+ auto task = std::make_shared<MemtableFlushTask>(this, std::move(mem_table), submit_task_time,
+ tracker);
+ return _flush_token->submit(std::move(task));
}
void FlushToken::cancel() {
@@ -60,21 +79,8 @@ Status FlushToken::wait() {
return s == OLAP_SUCCESS ? Status::OK() : Status::OLAPInternalError(s);
}
-void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t submit_task_time) {
-#ifndef BE_TEST
- // The memtable mem tracker needs to be completely accurate,
- // because DeltaWriter judges whether to flush memtable according to the memtable memory usage.
- // The macro of attach thread mem tracker is affected by the destructuring order of local variables,
- // so it cannot completely correspond to the number of new/delete bytes recorded in scoped,
- // and there is a small range of errors. So direct track load mem tracker.
- // TODO(zxy) After rethinking the use of switch thread mem tracker, choose the appropriate way to get
- // load mem tracke here.
- // DCHECK(memtable->mem_tracker()->parent_task_mem_tracker_no_own());
- // SCOPED_ATTACH_TASK(ThreadContext::TaskType::LOAD,
- // memtable->mem_tracker()->parent_task_mem_tracker_no_own());
-#endif
+void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) {
_stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time);
- SCOPED_CLEANUP({ memtable.reset(); });
// If previous flush has failed, return directly
if (_flush_status.load() != OLAP_SUCCESS) {
return;
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index 36d5c21be8..6f986af3f5 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -31,6 +31,7 @@ class DataDir;
class DeltaWriter;
class ExecEnv;
class MemTable;
+class MemTrackerLimiter;
// the statistic of a certain flush handler.
// use atomic because it may be updated by multi threads
@@ -56,7 +57,7 @@ public:
explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
: _flush_token(std::move(flush_pool_token)), _flush_status(OLAP_SUCCESS) {}
- Status submit(const std::shared_ptr<MemTable>& mem_table);
+ Status submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker);
// error has happpens, so we cancel this token
// And remove all tasks in the queue.
@@ -69,7 +70,9 @@ public:
const FlushStatistic& get_stats() const { return _stats; }
private:
- void _flush_memtable(std::shared_ptr<MemTable> mem_table, int64_t submit_task_time);
+ friend class MemtableFlushTask;
+
+ void _flush_memtable(MemTable* mem_table, int64_t submit_task_time);
std::unique_ptr<ThreadPoolToken> _flush_token;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 28e9241985..ef53eeaee4 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -111,14 +111,20 @@ Status StorageEngine::start_bg_threads() {
scoped_refptr<Thread> path_scan_thread;
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_scan_thread",
- [this, data_dir]() { this->_path_scan_thread_callback(data_dir); },
+ [this, data_dir]() {
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+ this->_path_scan_thread_callback(data_dir);
+ },
&path_scan_thread));
_path_scan_threads.emplace_back(path_scan_thread);
scoped_refptr<Thread> path_gc_thread;
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_gc_thread",
- [this, data_dir]() { this->_path_gc_thread_callback(data_dir); },
+ [this, data_dir]() {
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+ this->_path_gc_thread_callback(data_dir);
+ },
&path_gc_thread));
_path_gc_threads.emplace_back(path_gc_thread);
}
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 0179f3f064..9a7c414971 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -122,6 +122,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")),
_consistency_mem_tracker(
std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Consistency")),
+ _mem_tracker(std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Self")),
_stop_background_threads_latch(1),
_tablet_manager(new TabletManager(config::tablet_map_shard_size)),
_txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)),
@@ -166,7 +167,8 @@ StorageEngine::~StorageEngine() {
void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
std::vector<std::thread> threads;
for (auto data_dir : data_dirs) {
- threads.emplace_back([data_dir] {
+ threads.emplace_back([this, data_dir] {
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
auto res = data_dir->load();
if (!res.ok()) {
LOG(WARNING) << "io error when init load tables. res=" << res
@@ -217,7 +219,8 @@ Status StorageEngine::_init_store_map() {
DataDir* store = new DataDir(path.path, path.capacity_bytes, path.storage_medium,
_tablet_manager.get(), _txn_manager.get());
tmp_stores.emplace_back(store);
- threads.emplace_back([store, &error_msg_lock, &error_msg]() {
+ threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
+ SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
auto st = store->init();
if (!st.ok()) {
{
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index df018e27d3..d508a617c5 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -346,6 +346,8 @@ private:
std::unique_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
// Count the memory consumption of all EngineChecksumTask.
std::unique_ptr<MemTrackerLimiter> _consistency_mem_tracker;
+ // StorageEngine oneself
+ std::unique_ptr<MemTrackerLimiter> _mem_tracker;
CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _unused_rowset_monitor_thread;
diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc
index f2c68b1e99..e56c4bff69 100644
--- a/be/src/runtime/disk_io_mgr.cc
+++ b/be/src/runtime/disk_io_mgr.cc
@@ -373,8 +373,8 @@ Status DiskIoMgr::init(const int64_t mem_limit) {
ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
// _disk_thread_group.AddThread(new Thread("disk-io-mgr", ss.str(),
// &DiskIoMgr::work_loop, this, _disk_queues[i]));
- _disk_thread_group.add_thread(new std::thread(
- std::bind(&DiskIoMgr::work_loop, this, _disk_queues[i], _mem_tracker.get())));
+ _disk_thread_group.add_thread(
+ new std::thread(std::bind(&DiskIoMgr::work_loop, this, _disk_queues[i])));
}
}
_request_context_cache.reset(new RequestContextCache(this));
@@ -947,7 +947,7 @@ void DiskIoMgr::handle_read_finished(DiskQueue* disk_queue, RequestContext* read
state.decrement_request_thread();
}
-void DiskIoMgr::work_loop(DiskQueue* disk_queue, MemTrackerLimiter* mem_tracker) {
+void DiskIoMgr::work_loop(DiskQueue* disk_queue) {
// The thread waits until there is work or the entire system is being shut down.
// If there is work, performs the read or write requested and re-enqueues the
// requesting context.
@@ -960,8 +960,7 @@ void DiskIoMgr::work_loop(DiskQueue* disk_queue, MemTrackerLimiter* mem_tracker)
// 3. Perform the read or write as specified.
// Cancellation checking needs to happen in both steps 1 and 3.
- // tracked in the process tracker
- // SCOPED_ATTACH_TASK(ThreadContext::TaskType::STORAGE, mem_tracker);
+ thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
while (!_shut_down) {
RequestContext* worker_context = nullptr;
;
diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h
index 3279ac2559..9d0aa2f5ae 100644
--- a/be/src/runtime/disk_io_mgr.h
+++ b/be/src/runtime/disk_io_mgr.h
@@ -795,7 +795,7 @@ private:
// Disk worker thread loop. This function retrieves the next range to process on
// the disk queue and invokes read_range() or Write() depending on the type of Range().
// There can be multiple threads per disk running this loop.
- void work_loop(DiskQueue* queue, MemTrackerLimiter* mem_tracker);
+ void work_loop(DiskQueue* queue);
// This is called from the disk thread to get the next range to process. It will
// wait until a scan range and buffer are available, or a write range is available.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a35ae80ee3..8281375331 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -191,6 +191,7 @@ Status ExecEnv::_init_mem_tracker() {
}
_process_mem_tracker = new MemTrackerLimiter(global_memory_limit_bytes, "Process");
thread_context()->_thread_mem_tracker_mgr->init();
+ thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
!defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
if (doris::config::enable_tcmalloc_hook) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c5fcfa97c4..7b16f853ce 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -477,6 +477,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
std::string func_name {"PlanFragmentExecutor::_exec_actual"};
#ifndef BE_TEST
auto span = exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name);
+ SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state());
#else
auto span = telemetry::get_noop_tracer()->StartSpan(func_name);
#endif
@@ -493,9 +494,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
.query_id(exec_state->query_id())
.instance_id(exec_state->fragment_instance_id())
.tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
-#ifndef BE_TEST
- SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state());
-#endif
+
exec_state->execute();
std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 9a9d1c808f..e1fc0a3463 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -50,7 +50,6 @@ LoadChannel::~LoadChannel() {
}
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
- // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
int64_t index_id = params.index_id();
std::shared_ptr<TabletsChannel> channel;
{
@@ -61,7 +60,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
} else {
// create a new tablets channel
TabletsChannelKey key(params.id(), index_id);
- channel.reset(new TabletsChannel(key, _is_high_priority, _is_vec));
+ channel.reset(new TabletsChannel(key, _mem_tracker.get(), _is_high_priority, _is_vec));
_tablets_channels.insert({index_id, channel});
}
}
@@ -137,7 +136,6 @@ bool LoadChannel::is_finished() {
Status LoadChannel::cancel() {
std::lock_guard<std::mutex> l(_lock);
- // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
for (auto& it : _tablets_channels) {
it.second->cancel();
}
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 4137c7fafc..ad8a476fcf 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -129,7 +129,6 @@ private:
template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
TabletWriterAddResult* response) {
- // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr<TabletsChannel> channel;
diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index dec04a20f5..0f4e185e46 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -80,12 +80,14 @@ MemTracker::~MemTracker() {
MemTracker* MemTracker::get_static_mem_tracker(const std::string& label) {
// First time this label registered, make a new object, otherwise do nothing.
// Avoid using locks to resolve erase conflicts.
+ MemTracker* tracker;
_static_mem_trackers.lazy_emplace_l(
- label, [&](MemTracker*) {},
+ label, [&](MemTracker* v) { tracker = v; },
[&](const auto& ctor) {
- ctor(label, new MemTracker(fmt::format("[Static]-{}", label)));
+ tracker = new MemTracker(fmt::format("[Static]-{}", label));
+ ctor(label, tracker);
});
- return _static_mem_trackers[label];
+ return tracker;
}
MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const {
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 4e4af1723d..9f6e021a3c 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -30,9 +30,9 @@ class MemTrackerLimiter;
// MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER,
// which will automatically track all memory usage of the code segment where it is located.
//
-// MemTracker's father can only be MemTrackerLimiter, which is only used to print tree-like statistics.
-// Consuming MemTracker will not consume its father synchronously.
-// Usually, it is not necessary to specify the father. by default, the MemTrackerLimiter in the thread context
+// MemTracker's parent can only be MemTrackerLimiter, which is only used to print tree-like statistics.
+// Consuming MemTracker will not consume its parent synchronously.
+// Usually, it is not necessary to specify the parent. by default, the MemTrackerLimiter in the thread context
// is used as the parent, which is specified when the thread starts.
//
// This class is thread-safe.
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 1f3fb89800..786ad945bf 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -119,7 +119,13 @@ public:
Status try_gc_memory(int64_t bytes);
public:
- void consumption_revise(int64_t bytes) { _consumption->add(bytes); }
+ // It is used for revise mem tracker consumption.
+ // If the location of memory alloc and free is different, the consumption value of mem tracker will be inaccurate.
+ // But the consumption value of the process mem tracker is not affecte
+ void consumption_revise(int64_t bytes) {
+ DCHECK(_label != "Process");
+ _consumption->add(bytes);
+ }
// Logs the usage of this tracker limiter and optionally its children (recursively).
// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 86f2976f18..a4831114cf 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -23,11 +23,6 @@
namespace doris {
-// When MemTracker is a negative value, it is considered that a memory leak has occurred,
-// but the actual MemTracker records inaccurately will also cause a negative value,
-// so this feature is in the experimental stage.
-const bool QUERY_MEMORY_LEAK_DETECTION = false;
-
MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id,
int64_t mem_limit,
const std::string& label,
@@ -37,15 +32,15 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std:
// Combine new tracker and emplace into one operation to avoid the use of locks
// Name for task MemTrackers. '$0' is replaced with the task id.
bool new_emplace = _task_mem_trackers.lazy_emplace_l(
- task_id, [&](MemTrackerLimiter*) {},
+ task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
[&](const auto& ctor) {
- ctor(task_id, new MemTrackerLimiter(mem_limit, label, parent));
+ ctor(task_id, std::make_unique<MemTrackerLimiter>(mem_limit, label, parent));
});
if (new_emplace) {
LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id
<< " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
}
- return _task_mem_trackers[task_id];
+ return _task_mem_trackers[task_id].get();
}
MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id,
@@ -67,35 +62,35 @@ MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& t
DCHECK(!task_id.empty());
MemTrackerLimiter* tracker = nullptr;
// Avoid using locks to resolve erase conflicts
- _task_mem_trackers.if_contains(task_id, [&tracker](MemTrackerLimiter* v) { tracker = v; });
+ _task_mem_trackers.if_contains(
+ task_id, [&tracker](std::shared_ptr<MemTrackerLimiter> v) { tracker = v.get(); });
return tracker;
}
void MemTrackerTaskPool::logout_task_mem_tracker() {
- std::vector<std::string> expired_tasks;
- for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) {
+ for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end();) {
if (!it->second) {
- // https://github.com/apache/incubator-doris/issues/10006
- expired_tasks.emplace_back(it->first);
+ // Unknown exception case with high concurrency, after _task_mem_trackers.erase,
+ // the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006
+ _task_mem_trackers._erase(it++);
} else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) {
// No RuntimeState uses this task MemTracker, it is only referenced by this map,
// and tracker was not created soon, delete it.
- if (QUERY_MEMORY_LEAK_DETECTION && it->second->consumption() != 0) {
- // If consumption is not equal to 0 before query mem tracker is destructed,
- // there are two possibilities in theory.
- // 1. A memory leak occurs.
- // 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on
- // other trackers such as the process mem tracker, and there is no manual transfer between the two trackers.
- //
- // The second case should be eliminated in theory, but it has not been done so far, so the query memory leak
- // cannot be located, and the value of the query pool mem tracker statistics will be inaccurate.
- LOG(WARNING) << "Task memory tracker memory leak:" << it->second->debug_string();
- }
+ //
+ // If consumption is not equal to 0 before query mem tracker is destructed,
+ // there are two possibilities in theory.
+ // 1. A memory leak occurs.
+ // 2. memory consumed on query mem tracker, released on other trackers, and no manual transfer
+ // between the two trackers.
+ // At present, it is impossible to effectively locate which memory consume and release on different trackers,
+ // so query memory leaks cannot be found.
+ //
// In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers,
// the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is,
// the negative number of the current value of consume.
it->second->parent()->consumption_revise(-it->second->consumption());
- expired_tasks.emplace_back(it->first);
+ LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first;
+ _task_mem_trackers._erase(it++);
} else {
// Log limit exceeded query tracker.
if (it->second->limit_exceeded()) {
@@ -104,16 +99,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
0, Status::OK());
}
- }
- }
- for (auto tid : expired_tasks) {
- if (!_task_mem_trackers[tid]) {
- _task_mem_trackers.erase(tid);
- LOG(INFO) << "Deregister null query/load memory tracker, query/load id: " << tid;
- } else {
- delete _task_mem_trackers[tid];
- _task_mem_trackers.erase(tid);
- LOG(INFO) << "Deregister not used query/load memory tracker, query/load id: " << tid;
+ ++it;
}
}
}
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h
index ae7d82caf4..4890d72713 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.h
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -24,9 +24,10 @@
namespace doris {
using TaskTrackersMap = phmap::parallel_flat_hash_map<
- std::string, MemTrackerLimiter*, phmap::priv::hash_default_hash<std::string>,
- phmap::priv::hash_default_eq<std::string>,
- std::allocator<std::pair<const std::string, MemTrackerLimiter*>>, 12, std::mutex>;
+ std::string, std::shared_ptr<MemTrackerLimiter>,
+ phmap::priv::hash_default_hash<std::string>, phmap::priv::hash_default_eq<std::string>,
+ std::allocator<std::pair<const std::string, std::shared_ptr<MemTrackerLimiter>>>, 12,
+ std::mutex>;
// Global task pool for query MemTrackers. Owned by ExecEnv.
class MemTrackerTaskPool {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index c129deb8ee..53a47202b1 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -29,6 +29,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
const TUniqueId& fragment_instance_id,
MemTrackerLimiter* mem_tracker) {
DCHECK(mem_tracker);
+ flush_untracked_mem<false>();
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
_exceed_cb.cancel_msg = cancel_msg;
@@ -36,6 +37,13 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
}
void ThreadMemTrackerMgr::detach_limiter_tracker() {
+#ifndef BE_TEST
+ // Unexpectedly, the runtime state is destructed before the end of the query sub-thread,
+ // (_hash_table_build_thread has appeared) which is not a graceful exit.
+ // consider replacing CHECK with a conditional statement and checking for runtime state survival.
+ CHECK(_task_id == "" ||
+ ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(_task_id));
+#endif
flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index d32eaaf082..8ccb6f70b1 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -19,12 +19,18 @@
#include <fmt/format.h>
#include <parallel_hashmap/phmap.h>
+#include <service/brpc_conflict.h>
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
+// After brpc_conflict.h
+#include <bthread/bthread.h>
namespace doris {
+extern bthread_key_t btls_key;
+static const bthread_key_t EMPTY_BTLS_KEY = {0, 0};
+
using ExceedCallBack = void (*)();
struct MemExceedCallBackInfo {
std::string cancel_msg;
@@ -113,6 +119,7 @@ public:
MemTrackerLimiter* limiter_mem_tracker() { return _limiter_tracker; }
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
+ void set_check_attach(bool check_attach) { _check_attach = check_attach; }
std::string print_debug_string() {
fmt::memory_buffer consumer_tracker_buf;
@@ -144,6 +151,7 @@ private:
bool _check_limit = false;
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
+ bool _check_attach = true;
std::string _task_id;
TUniqueId _fragment_instance_id;
MemExceedCallBackInfo _exceed_cb;
@@ -195,6 +203,16 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
_stop_consume = true;
DCHECK(_limiter_tracker);
if (CheckLimit) {
+#ifndef BE_TEST
+ // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker.
+ // If _check_attach is true and it is not in the brpc server (the protobuf will be operated when bthread is started),
+ // it will check whether the tracker label is equal to the default "Process" when flushing.
+ // If you do not want this check, set_check_attach=true
+ // TODO(zxy) The current p0 test cannot guarantee that all threads are checked,
+ // so disable it and try to open it when memory tracking is not on time.
+ DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
+ _limiter_tracker->label() != "Process");
+#endif
Status st = _limiter_tracker->try_consume(_untracked_mem);
if (!st) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index f35ffec2b7..043f4c7eab 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -30,12 +30,15 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec)
+TabletsChannel::TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker,
+ bool is_high_priority, bool is_vec)
: _key(key),
_state(kInitialized),
_closed_senders(64),
_is_high_priority(is_high_priority),
_is_vec(is_vec) {
+ _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ -1, fmt::format("TabletsChannel#indexID={}", key.index_id), parent_tracker);
static std::once_flag once_flag;
std::call_once(once_flag, [] {
REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); });
@@ -207,14 +210,6 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
return Status::OK();
}
-int64_t TabletsChannel::mem_consumption() {
- int64_t mem_usage = 0;
- for (auto& it : _tablet_writers) {
- mem_usage += it.second->mem_consumption();
- }
- return mem_usage;
-}
-
Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
@@ -245,7 +240,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
wrequest.ptable_schema_param = request.schema();
DeltaWriter* writer = nullptr;
- auto st = DeltaWriter::open(&wrequest, &writer, _is_vec);
+ auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker.get(), _is_vec);
if (!st.ok()) {
std::stringstream ss;
ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index bd7504cd2f..318dd879a4 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -60,7 +60,8 @@ class OlapTableSchemaParam;
// Write channel for a particular (load, index).
class TabletsChannel {
public:
- TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec);
+ TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker,
+ bool is_high_priority, bool is_vec);
~TabletsChannel();
@@ -88,7 +89,7 @@ public:
// no-op when this channel has been closed or cancelled
Status reduce_mem_usage(int64_t mem_limit);
- int64_t mem_consumption();
+ int64_t mem_consumption() const { return _mem_tracker->consumption(); }
private:
template <typename Request>
@@ -143,6 +144,8 @@ private:
static std::atomic<uint64_t> _s_tablet_writer_count;
+ std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+
bool _is_high_priority = false;
bool _is_vec = false;
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index 5bfa58fa39..6071defe64 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -37,25 +37,6 @@ AttachTask::AttachTask(MemTrackerLimiter* mem_tracker, const ThreadContext::Task
#endif
}
-// AttachTask::AttachTask(const TQueryType::type& query_type,
-// MemTrackerLimiter* mem_tracker) {
-// DCHECK(mem_tracker);
-// #ifdef USE_MEM_TRACKER
-// thread_context()->attach_task(query_to_task_type(query_type), "", TUniqueId(), mem_tracker);
-// #endif
-// }
-
-// AttachTask::AttachTask(const TQueryType::type& query_type,
-// MemTrackerLimiter* mem_tracker, const std::string& task_id,
-// const TUniqueId& fragment_instance_id) {
-// DCHECK(task_id != "");
-// DCHECK(fragment_instance_id != TUniqueId());
-// DCHECK(mem_tracker);
-// #ifdef USE_MEM_TRACKER
-// thread_context()->attach_task(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker);
-// #endif
-// }
-
AttachTask::AttachTask(RuntimeState* runtime_state) {
#ifndef BE_TEST
DCHECK(print_id(runtime_state->query_id()) != "");
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index f1439d1eea..2c14929432 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -43,7 +43,6 @@ class TUniqueId;
class ThreadContext;
extern bthread_key_t btls_key;
-static const bthread_key_t EMPTY_BTLS_KEY = {0, 0};
// Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error,
// see https://github.com/apache/doris/pull/7911
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index c68add8a76..a78bb801b1 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -473,9 +473,7 @@ int main(int argc, char** argv) {
#endif
doris::PerfCounters::refresh_proc_status();
- // TODO(zxy) 10s is too long to clear the expired task mem tracker.
- // A query mem tracker is about 57 bytes, assuming 10000 qps, which wastes about 55M of memory.
- // It should be actively triggered at the end of query/load.
+ // 1s clear the expired task mem tracker, a query mem tracker is about 57 bytes.
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
sleep(1);
}
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index 0a85a60aa7..34cc7ddf26 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -109,6 +109,7 @@ Status VBrokerScanNode::start_scanners() {
Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VBrokerScanNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// check if CANCELLED.
if (state->is_cancelled()) {
std::unique_lock<std::mutex> l(_batch_queue_lock);
@@ -275,6 +276,8 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
void VBrokerScanNode::scanner_worker(int start_idx, int length) {
START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker");
+ SCOPED_ATTACH_TASK(_runtime_state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
Thread::set_self_name("vbroker_scanner");
Status status = Status::OK();
ScannerCounter counter;
diff --git a/be/src/vec/exec/ves_http_scan_node.cpp b/be/src/vec/exec/ves_http_scan_node.cpp
index 3029da4f8c..3a9a1ef673 100644
--- a/be/src/vec/exec/ves_http_scan_node.cpp
+++ b/be/src/vec/exec/ves_http_scan_node.cpp
@@ -385,6 +385,7 @@ void VEsHttpScanNode::debug_string(int ident_level, std::stringstream* out) cons
void VEsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VEsHttpScanNode::scanner_worker");
SCOPED_ATTACH_TASK(_runtime_state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
DCHECK(start_idx < length);
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 048395eec5..1dbcc05daa 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -141,6 +141,7 @@ Status VNodeChannel::open_wait() {
}
Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
+ SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index 33c4a48737..3126b30f94 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -398,7 +398,7 @@ TEST_F(TestDeltaWriter, open) {
SAFE_DELETE(delta_writer);
// test vec delta writer
- DeltaWriter::open(&write_req, &delta_writer, true);
+ DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
@@ -551,7 +551,7 @@ TEST_F(TestDeltaWriter, vec_write) {
WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
30002, load_id, tuple_desc, &(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
- DeltaWriter::open(&write_req, &delta_writer, true);
+ DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
ASSERT_NE(delta_writer, nullptr);
MemPool pool;
@@ -764,7 +764,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
30003, load_id, tuple_desc, &(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
- DeltaWriter::open(&write_req, &delta_writer, true);
+ DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
ASSERT_NE(delta_writer, nullptr);
MemPool pool;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org