You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/01/03 12:05:30 UTC
[doris] branch master updated: [Enhancement](load) reduce memory by memory size of global delta writer (#14491)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4380f1ec54 [Enhancement](load) reduce memory by memory size of global delta writer (#14491)
4380f1ec54 is described below
commit 4380f1ec54935cac3a8b271e061942eebe8ffd9f
Author: Xin Liao <li...@126.com>
AuthorDate: Tue Jan 3 20:05:21 2023 +0800
[Enhancement](load) reduce memory by memory size of global delta writer (#14491)
---
be/src/olap/delta_writer.cpp | 21 ++--
be/src/runtime/load_channel.cpp | 31 ------
be/src/runtime/load_channel.h | 42 ++++---
be/src/runtime/load_channel_mgr.cpp | 169 +++++++++++++++-------------
be/src/runtime/load_channel_mgr.h | 30 ++---
be/src/runtime/tablets_channel.cpp | 215 ++++++++++++------------------------
be/src/runtime/tablets_channel.h | 34 +++---
7 files changed, 228 insertions(+), 314 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 0d35a4b49c..22b7f568d1 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -17,14 +17,11 @@
#include "olap/delta_writer.h"
-#include "olap/base_compaction.h"
-#include "olap/cumulative_compaction.h"
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
-#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/row_batch.h"
@@ -223,14 +220,16 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
}
Status DeltaWriter::wait_flush() {
- std::lock_guard<std::mutex> l(_lock);
- if (!_is_init) {
- // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason
- // as described in flush_memtable_and_wait()
- return Status::OK();
- }
- if (_is_cancelled) {
- return _cancel_status;
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_is_init) {
+ // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason
+ // as described in flush_memtable_and_wait()
+ return Status::OK();
+ }
+ if (_is_cancelled) {
+ return _cancel_status;
+ }
}
RETURN_NOT_OK(_flush_token->wait());
return Status::OK();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 886747b5cd..de3d8d3365 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -90,18 +90,6 @@ Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channe
return Status::OK();
}
-// lock should be held when calling this method
-bool LoadChannel::_find_largest_consumption_channel(std::shared_ptr<TabletsChannel>* channel) {
- int64_t max_consume = 0;
- for (auto& it : _tablets_channels) {
- if (it.second->mem_consumption() > max_consume) {
- max_consume = it.second->mem_consumption();
- *channel = it.second;
- }
- }
- return max_consume > 0;
-}
-
bool LoadChannel::is_finished() {
if (!_opened) {
return false;
@@ -118,23 +106,4 @@ Status LoadChannel::cancel() {
return Status::OK();
}
-void LoadChannel::handle_mem_exceed_limit() {
- bool found = false;
- std::shared_ptr<TabletsChannel> channel;
- {
- // lock so that only one thread can check mem limit
- std::lock_guard<SpinLock> l(_tablets_channels_lock);
- found = _find_largest_consumption_channel(&channel);
- }
- // Release lock so that other threads can still call add_batch concurrently.
- if (found) {
- DCHECK(channel != nullptr);
- channel->reduce_mem_usage();
- } else {
- // should not happen, add log to observe
- LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. "
- << "load_id=" << _load_id;
- }
-}
-
} // namespace doris
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index d3463c3db5..2fad48c79d 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,18 +17,17 @@
#pragma once
+#include <functional>
+#include <map>
#include <mutex>
#include <ostream>
#include <unordered_map>
#include <unordered_set>
#include "common/status.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/tablets_channel.h"
-#include "runtime/thread_context.h"
#include "util/uid_util.h"
namespace doris {
@@ -59,12 +58,6 @@ public:
const UniqueId& load_id() const { return _load_id; }
- // check if this load channel mem consumption exceeds limit.
- // If yes, it will pick a tablets channel to try to reduce memory consumption.
- // The method will not return until the chosen tablet channels finished memtable
- // flush.
- void handle_mem_exceed_limit();
-
int64_t mem_consumption() {
int64_t mem_usage = 0;
{
@@ -77,10 +70,37 @@ public:
return mem_usage;
}
+ void get_writers_mem_consumption_snapshot(
+ std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>*
+ writers_mem_snap) {
+ std::lock_guard<SpinLock> l(_tablets_channels_lock);
+ for (auto& it : _tablets_channels) {
+ std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem;
+ it.second->get_writers_mem_consumption_snapshot(&tablets_channel_mem);
+ writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem));
+ }
+ }
+
int64_t timeout() const { return _timeout_s; }
bool is_high_priority() const { return _is_high_priority; }
+ void flush_memtable_async(int64_t index_id, int64_t tablet_id) {
+ std::lock_guard<std::mutex> l(_lock);
+ auto it = _tablets_channels.find(index_id);
+ if (it != _tablets_channels.end()) {
+ it->second->flush_memtable_async(tablet_id);
+ }
+ }
+
+ void wait_flush(int64_t index_id, int64_t tablet_id) {
+ std::lock_guard<std::mutex> l(_lock);
+ auto it = _tablets_channels.find(index_id);
+ if (it != _tablets_channels.end()) {
+ it->second->wait_flush(tablet_id);
+ }
+ }
+
protected:
Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, bool& is_finished,
const int64_t index_id);
@@ -107,10 +127,6 @@ protected:
}
private:
- // when mem consumption exceeds limit, should call this method to find the channel
- // that consumes the largest memory(, and then we can reduce its memory usage).
- bool _find_largest_consumption_channel(std::shared_ptr<TabletsChannel>* channel);
-
UniqueId _load_id;
// Tracks the total memory consumed by current load job on this BE
std::unique_ptr<MemTracker> _mem_tracker;
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 15e9ea0c9e..fe4fe4250b 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -17,11 +17,15 @@
#include "runtime/load_channel_mgr.h"
-#include "gutil/strings/substitute.h"
+#include <functional>
+#include <map>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <vector>
+
#include "runtime/load_channel.h"
#include "runtime/memory/mem_tracker.h"
-#include "runtime/thread_context.h"
-#include "service/backend_options.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
@@ -71,12 +75,6 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
_load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
_load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100;
- // If a load channel's memory consumption is no more than 10% of the hard limit, it's not
- // worth to reduce memory on it. Since we only reduce 1/3 memory for one load channel,
- // for a channel consume 10% of hard limit, we can only release about 3% memory each time,
- // it's not quite helpfull to reduce memory pressure.
- // In this case we need to pick multiple load channels to reduce memory more effectively.
- _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1;
_mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
_mem_tracker_set = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
"LoadChannelMgrTrackerSet");
@@ -221,13 +219,16 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
int64_t process_mem_limit = MemInfo::soft_mem_limit();
+ int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache();
if (_mem_tracker->consumption() < _load_soft_mem_limit &&
- MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
+ proc_mem_no_allocator_cache < process_mem_limit) {
return;
}
// Indicate whether current thread is reducing mem on hard limit.
bool reducing_mem_on_hard_limit = false;
- std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
+ // tuple<LoadChannel, index_id, tablet_id, mem_size>
+ std::vector<std::tuple<std::shared_ptr<LoadChannel>, int64_t, int64_t, int64_t>>
+ writers_to_reduce_mem;
{
std::unique_lock<std::mutex> l(_lock);
while (_should_wait_flush) {
@@ -236,102 +237,117 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
_wait_flush_cond.wait(l);
}
bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit ||
- MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit;
+ proc_mem_no_allocator_cache >= process_mem_limit;
// Some other thread is flushing data, and not reached hard limit now,
// we don't need to handle mem limit in current thread.
if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
return;
}
- // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory
- // due to soft limit, and we reached hard limit now, current thread may pick some
- // duplicate channels and trigger duplicate reducing memory process.
- // But the load channel's reduce memory process is thread safe, only 1 thread can
- // reduce memory at the same time, other threads will wait on a condition variable,
- // after the reduce-memory work finished, all threads will return.
- using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>;
- std::vector<ChannelMemPair> candidate_channels;
- int64_t total_consume = 0;
+ // tuple<LoadChannel, index_id, multimap<mem size, tablet_id>>
+ using WritersMem = std::tuple<std::shared_ptr<LoadChannel>, int64_t,
+ std::multimap<int64_t, int64_t, std::greater<int64_t>>>;
+ std::vector<WritersMem> all_writers_mem;
+
+ // tuple<current iterator in multimap, end iterator in multimap, pos in all_writers_mem>
+ using WriterMemItem =
+ std::tuple<std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator,
+ std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator,
+ size_t>;
+ auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
+ return std::get<0>(lhs)->first < std::get<0>(rhs)->first;
+ };
+ std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)>
+ tablets_mem_heap(cmp);
+
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
// to avoid blocking them.
continue;
}
- int64_t mem = kv.second->mem_consumption();
- // save the mem consumption, since the calculation might be expensive.
- candidate_channels.push_back(std::make_pair(kv.second, mem));
- total_consume += mem;
+ std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>
+ writers_mem_snap;
+ kv.second->get_writers_mem_consumption_snapshot(&writers_mem_snap);
+ for (auto item : writers_mem_snap) {
+ // multimap is empty
+ if (item.second.empty()) {
+ continue;
+ }
+ all_writers_mem.emplace_back(kv.second, item.first, std::move(item.second));
+ size_t pos = all_writers_mem.size() - 1;
+ tablets_mem_heap.emplace(std::get<2>(all_writers_mem[pos]).begin(),
+ std::get<2>(all_writers_mem[pos]).end(), pos);
+ }
}
- if (candidate_channels.empty()) {
- // should not happen, add log to observe
- LOG(WARNING) << "All load channels are high priority, failed to find suitable"
- << "channels to reduce memory when total load mem limit exceed";
- return;
+ // reduce 1/10 memory every time
+ int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
+ int64_t mem_consumption_in_picked_writer = 0;
+ while (!tablets_mem_heap.empty()) {
+ WriterMemItem tablet_mem_item = tablets_mem_heap.top();
+ size_t pos = std::get<2>(tablet_mem_item);
+ auto load_channel = std::get<0>(all_writers_mem[pos]);
+ int64_t index_id = std::get<1>(all_writers_mem[pos]);
+ int64_t tablet_id = std::get<0>(tablet_mem_item)->second;
+ int64_t mem_size = std::get<0>(tablet_mem_item)->first;
+ writers_to_reduce_mem.emplace_back(load_channel, index_id, tablet_id, mem_size);
+ load_channel->flush_memtable_async(index_id, tablet_id);
+ mem_consumption_in_picked_writer += std::get<0>(tablet_mem_item)->first;
+ if (mem_consumption_in_picked_writer > mem_to_flushed) {
+ break;
+ }
+ tablets_mem_heap.pop();
+ if (std::get<0>(tablet_mem_item)++ != std::get<1>(tablet_mem_item)) {
+ tablets_mem_heap.push(tablet_mem_item);
+ }
}
- // sort all load channels, try to find the largest one.
- std::sort(candidate_channels.begin(), candidate_channels.end(),
- [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
- return lhs.second > rhs.second;
- });
-
- int64_t mem_consumption_in_picked_channel = 0;
- auto largest_channel = *candidate_channels.begin();
- // If some load-channel is big enough, we can reduce it only, try our best to avoid
- // reducing small load channels.
- if (_load_channel_min_mem_to_reduce > 0 &&
- largest_channel.second > _load_channel_min_mem_to_reduce) {
- // Pick 1 load channel to reduce memory.
- channels_to_reduce_mem.push_back(largest_channel.first);
- mem_consumption_in_picked_channel = largest_channel.second;
- } else {
- // Pick multiple channels to reduce memory.
- int64_t mem_to_flushed = total_consume / 3;
- for (auto ch : candidate_channels) {
- channels_to_reduce_mem.push_back(ch.first);
- mem_consumption_in_picked_channel += ch.second;
- if (mem_consumption_in_picked_channel >= mem_to_flushed) {
- break;
- }
- }
+ if (writers_to_reduce_mem.empty()) {
+ // should not happen, add log to observe
+ LOG(WARNING) << "failed to find suitable writers to reduce memory"
+ << " when total load mem limit exceed";
+ return;
}
std::ostringstream oss;
- if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
- oss << "reducing memory of " << channels_to_reduce_mem.size()
- << " load channels (total mem consumption: " << mem_consumption_in_picked_channel
- << " bytes), because total load mem consumption "
- << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES)
- << " has exceeded";
+ oss << "reducing memory of " << writers_to_reduce_mem.size()
+ << " delta writers (total mem: "
+ << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: "
+ << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front()))
+ << ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back()))
+ << "), ";
+ if (proc_mem_no_allocator_cache < process_mem_limit) {
+ oss << "because total load mem consumption "
+ << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded";
if (_mem_tracker->consumption() > _load_hard_mem_limit) {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
- oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
+ oss << " hard limit: " << PrettyPrinter::print_bytes(_load_hard_mem_limit);
} else {
_soft_reduce_mem_in_progress = true;
- oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
+ oss << " soft limit: " << PrettyPrinter::print_bytes(_load_soft_mem_limit);
}
} else {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
- oss << "reducing memory of " << channels_to_reduce_mem.size()
- << " load channels (total mem consumption: " << mem_consumption_in_picked_channel
- << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit "
- << PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
- << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str();
+ oss << "because proc_mem_no_allocator_cache consumption "
+ << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
+ << ", has exceeded process limit " << PrettyPrinter::print_bytes(process_mem_limit)
+ << ", total load mem consumption: "
+ << PrettyPrinter::print_bytes(_mem_tracker->consumption())
+ << ", vm_rss: " << PerfCounters::get_vm_rss_str()
+ << ", tc/jemalloc allocator cache: " << MemInfo::allocator_cache_mem_str();
}
LOG(INFO) << oss.str();
}
- for (auto ch : channels_to_reduce_mem) {
- uint64_t begin = GetCurrentTimeMicros();
- int64_t mem_usage = ch->mem_consumption();
- ch->handle_mem_exceed_limit();
- LOG(INFO) << "reduced memory of " << *ch << ", cost "
- << (GetCurrentTimeMicros() - begin) / 1000
- << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes";
+ // wait all writers flush without lock
+ for (auto item : writers_to_reduce_mem) {
+ VLOG_NOTICE << "reducing memory, wait flush load_id: " << std::get<0>(item)->load_id()
+ << ", index_id: " << std::get<1>(item) << ", tablet_id: " << std::get<2>(item)
+ << ", mem_size: " << PrettyPrinter::print_bytes(std::get<3>(item));
+ std::get<0>(item)->wait_flush(std::get<1>(item), std::get<2>(item));
}
{
@@ -345,8 +361,9 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
if (_soft_reduce_mem_in_progress) {
_soft_reduce_mem_in_progress = false;
}
+ // refresh mem tacker to avoid duplicate reduce
+ _refresh_mem_tracker_without_lock();
}
- return;
}
} // namespace doris
diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h
index 9bf604d87e..967617e9bc 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -24,14 +24,10 @@
#include <unordered_map>
#include "common/status.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "gutil/ref_counted.h"
#include "olap/lru_cache.h"
#include "runtime/load_channel.h"
-#include "runtime/tablets_channel.h"
-#include "runtime/thread_context.h"
#include "util/countdown_latch.h"
#include "util/thread.h"
#include "util/uid_util.h"
@@ -59,14 +55,8 @@ public:
Status cancel(const PTabletWriterCancelRequest& request);
void refresh_mem_tracker() {
- int64_t mem_usage = 0;
- {
- std::lock_guard<std::mutex> l(_lock);
- for (auto& kv : _load_channels) {
- mem_usage += kv.second->mem_consumption();
- }
- }
- _mem_tracker->set_consumption(mem_usage);
+ std::lock_guard<std::mutex> l(_lock);
+ _refresh_mem_tracker_without_lock();
}
MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); }
@@ -82,6 +72,15 @@ private:
Status _start_bg_worker();
+ // lock should be held when calling this method
+ void _refresh_mem_tracker_without_lock() {
+ int64_t mem_usage = 0;
+ for (auto& kv : _load_channels) {
+ mem_usage += kv.second->mem_consumption();
+ }
+ _mem_tracker->set_consumption(mem_usage);
+ }
+
protected:
// lock protect the load channel map
std::mutex _lock;
@@ -95,13 +94,6 @@ protected:
std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
int64_t _load_hard_mem_limit = -1;
int64_t _load_soft_mem_limit = -1;
- // By default, we try to reduce memory on the load channel with largest mem consumption,
- // but if there are lots of small load channel, even the largest one consumes very
- // small memory, in this case we need to pick multiple load channels to reduce memory
- // more effectively.
- // `_load_channel_min_mem_to_reduce` is used to determine whether the largest load channel's
- // memory consumption is big enough.
- int64_t _load_channel_min_mem_to_reduce = -1;
bool _soft_reduce_mem_in_progress = false;
// If hard limit reached, one thread will trigger load channel flush,
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 483fd22edd..089b5ba4be 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -23,10 +23,7 @@
#include "olap/storage_engine.h"
#include "runtime/load_channel.h"
#include "runtime/row_batch.h"
-#include "runtime/thread_context.h"
-#include "runtime/tuple_row.h"
#include "util/doris_metrics.h"
-#include "util/time.h"
namespace doris {
@@ -208,136 +205,16 @@ int64_t TabletsChannel::mem_consumption() {
int64_t mem_usage = 0;
{
std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ _mem_consumptions.clear();
for (auto& it : _tablet_writers) {
- mem_usage += it.second->mem_consumption();
+ int64_t writer_mem = it.second->mem_consumption();
+ mem_usage += writer_mem;
+ _mem_consumptions.emplace(writer_mem, it.first);
}
}
return mem_usage;
}
-void TabletsChannel::reduce_mem_usage() {
- if (_try_to_wait_flushing()) {
- // `_try_to_wait_flushing()` returns true means other thread already
- // reduced the mem usage, and current thread do not need to reduce again.
- LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id
- << ", index_id: " << _index_id;
- return;
- }
-
- std::vector<DeltaWriter*> writers_to_wait_flush;
- {
- std::lock_guard<std::mutex> l(_lock);
- if (_state == kFinished) {
- // TabletsChannel is closed without LoadChannel's lock,
- // therefore it's possible for reduce_mem_usage() to be called right after close()
- LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id
- << ", index_id: " << _index_id;
- return;
- }
-
- // Sort the DeltaWriters by mem consumption in descend order.
- std::vector<DeltaWriter*> writers;
- for (auto& it : _tablet_writers) {
- it.second->save_mem_consumption_snapshot();
- writers.push_back(it.second);
- }
- int64_t total_memtable_consumption_in_flush = 0;
- for (auto writer : writers) {
- if (writer->get_memtable_consumption_inflush() > 0) {
- writers_to_wait_flush.push_back(writer);
- total_memtable_consumption_in_flush += writer->get_memtable_consumption_inflush();
- }
- }
- std::sort(writers.begin(), writers.end(),
- [](const DeltaWriter* lhs, const DeltaWriter* rhs) {
- return lhs->get_memtable_consumption_snapshot() >
- rhs->get_memtable_consumption_snapshot();
- });
-
- // Decide which writes should be flushed to reduce mem consumption.
- // The main idea is to flush at least one third of the mem_limit.
- // This is mainly to solve the following scenarios.
- // Suppose there are N tablets in this TabletsChannel, and the mem limit is M.
- // If the data is evenly distributed, when each tablet memory accumulates to M/N,
- // the reduce memory operation will be triggered.
- // At this time, the value of M/N may be much smaller than the value of `write_buffer_size`.
- // If we flush all the tablets at this time, each tablet will generate a lot of small files.
- // So here we only flush part of the tablet, and the next time the reduce memory operation is triggered,
- // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes.
-
- int64_t mem_to_flushed = mem_consumption() / 3;
- if (total_memtable_consumption_in_flush < mem_to_flushed) {
- mem_to_flushed -= total_memtable_consumption_in_flush;
- int counter = 0;
- int64_t sum = 0;
- for (auto writer : writers) {
- if (writer->mem_consumption() <= 0) {
- break;
- }
- ++counter;
- sum += writer->mem_consumption();
- if (sum > mem_to_flushed) {
- break;
- }
- }
- std::ostringstream ss;
- ss << "total size of memtables in flush: " << total_memtable_consumption_in_flush
- << " will flush " << counter << " more memtables to reduce memory: " << sum;
- if (counter > 0) {
- ss << ", the size of smallest memtable to flush is "
- << writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes";
- }
- LOG(INFO) << ss.str();
- // following loop flush memtable async, we'll do it with _lock
- for (int i = 0; i < counter; i++) {
- Status st = writers[i]->flush_memtable_and_wait(false);
- if (!st.ok()) {
- LOG_WARNING(
- "tablet writer failed to reduce mem consumption by flushing memtable")
- .tag("tablet_id", writers[i]->tablet_id())
- .tag("txn_id", _txn_id)
- .error(st);
- writers[i]->cancel_with_status(st);
- _broken_tablets.insert(writers[i]->tablet_id());
- }
- }
- for (int i = 0; i < counter; i++) {
- if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) {
- // skip broken tablets
- continue;
- }
- writers_to_wait_flush.push_back(writers[i]);
- }
- _reducing_mem_usage = true;
- } else {
- LOG(INFO) << "total size of memtables in flush is big enough: "
- << total_memtable_consumption_in_flush
- << " bytes, will not flush more memtables";
- }
- }
-
- for (auto writer : writers_to_wait_flush) {
- Status st = writer->wait_flush();
- if (!st.ok()) {
- LOG_WARNING(
- "tablet writer failed to reduce mem consumption by waiting flushing memtable")
- .tag("tablet_id", writer->tablet_id())
- .tag("txn_id", _txn_id)
- .error(st);
- writer->cancel_with_status(st);
- _broken_tablets.insert(writer->tablet_id());
- }
- }
-
- {
- std::lock_guard<std::mutex> l(_lock);
- _reducing_mem_usage = false;
- _reduce_memory_cond.notify_all();
- }
-
- return;
-}
-
Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
@@ -387,26 +264,6 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
return Status::OK();
}
-bool TabletsChannel::_try_to_wait_flushing() {
- bool duplicate_work = false;
- std::unique_lock<std::mutex> l(_lock);
- // NOTE: we call `reduce_mem_usage()` because we think it's necessary
- // to reduce it's memory and should not write more data into this
- // tablets channel. If there's already some other thead doing the
- // reduce-memory work, the only choice for current thread is to wait
- // here.
- // If current thread do not wait, it has two options:
- // 1. continue to write data to current channel.
- // 2. pick another tablets channel to flush
- // The first choice might cause OOM, the second choice might pick a
- // channel that is not big enough.
- while (_reducing_mem_usage) {
- duplicate_work = true;
- _reduce_memory_cond.wait(l);
- }
- return duplicate_work;
-}
-
Status TabletsChannel::cancel() {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
@@ -500,6 +357,70 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
return Status::OK();
}
+void TabletsChannel::flush_memtable_async(int64_t tablet_id) {
+ std::lock_guard<std::mutex> l(_lock);
+ if (_state == kFinished) {
+ // TabletsChannel is closed without LoadChannel's lock,
+ // therefore it's possible for reduce_mem_usage() to be called right after close()
+ LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id
+ << ", index_id: " << _index_id;
+ return;
+ }
+
+ auto iter = _tablet_writers.find(tablet_id);
+ if (iter == _tablet_writers.end()) {
+ return;
+ }
+
+ if (!(_reducing_tablets.insert(tablet_id).second)) {
+ return;
+ }
+
+ Status st = iter->second->flush_memtable_and_wait(false);
+ if (!st.ok()) {
+ auto err_msg = fmt::format(
+ "tablet writer failed to reduce mem consumption by flushing memtable, "
+ "tablet_id={}, txn_id={}, err={}",
+ tablet_id, _txn_id, st);
+ LOG(WARNING) << err_msg;
+ iter->second->cancel_with_status(st);
+ _broken_tablets.insert(iter->second->tablet_id());
+ }
+}
+
+void TabletsChannel::wait_flush(int64_t tablet_id) {
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ if (_state == kFinished) {
+ // TabletsChannel is closed without LoadChannel's lock,
+ // therefore it's possible for reduce_mem_usage() to be called right after close()
+ LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id
+ << ", index_id: " << _index_id;
+ return;
+ }
+ }
+
+ auto iter = _tablet_writers.find(tablet_id);
+ if (iter == _tablet_writers.end()) {
+ return;
+ }
+ Status st = iter->second->wait_flush();
+ if (!st.ok()) {
+ auto err_msg = fmt::format(
+ "tablet writer failed to reduce mem consumption by flushing memtable, "
+ "tablet_id={}, txn_id={}, err={}",
+ tablet_id, _txn_id, st);
+ LOG(WARNING) << err_msg;
+ iter->second->cancel_with_status(st);
+ _broken_tablets.insert(iter->second->tablet_id());
+ }
+
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ _reducing_tablets.erase(tablet_id);
+ }
+}
+
template Status
TabletsChannel::add_batch<PTabletWriterAddBlockRequest, PTabletWriterAddBlockResult>(
PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index c774994cb2..e649852a47 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -18,22 +18,19 @@
#pragma once
#include <cstdint>
+#include <functional>
+#include <map>
#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
-#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/thread_context.h"
#include "util/bitmap.h"
-#include "util/countdown_latch.h"
-#include "util/priority_thread_pool.hpp"
#include "util/uid_util.h"
-#include "vec/core/block.h"
namespace doris {
@@ -87,14 +84,17 @@ public:
// no-op when this channel has been closed or cancelled
Status cancel();
- // upper application may call this to try to reduce the mem usage of this channel.
- // eg. flush the largest memtable immediately.
- // return Status::OK if mem is reduced.
- // no-op when this channel has been closed or cancelled
- void reduce_mem_usage();
-
int64_t mem_consumption();
+ void get_writers_mem_consumption_snapshot(
+ std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
+ std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ *mem_consumptions = _mem_consumptions;
+ }
+
+ void flush_memtable_async(int64_t tablet_id);
+ void wait_flush(int64_t tablet_id);
+
private:
template <typename Request>
Status _get_current_seq(int64_t& cur_seq, const Request& request);
@@ -151,11 +151,7 @@ private:
// So that following batch will not handle this tablet anymore.
std::unordered_set<int64_t> _broken_tablets;
- bool _reducing_mem_usage = false;
- // only one thread can reduce memory for one TabletsChannel.
- // if some other thread call `reduce_memory_usage` at the same time,
- // it will wait on this condition variable.
- std::condition_variable _reduce_memory_cond;
+ std::unordered_set<int64_t> _reducing_tablets;
std::unordered_set<int64_t> _partition_ids;
@@ -164,6 +160,10 @@ private:
bool _is_high_priority = false;
bool _write_single_replica = false;
+
+ // mem -> tablet_id
+ // sort by memory size
+ std::multimap<int64_t, int64_t, std::greater<int64_t>> _mem_consumptions;
};
template <typename Request>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org