You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/01/03 12:05:30 UTC

[doris] branch master updated: [Enhancement](load) reduce memory by memory size of global delta writer (#14491)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4380f1ec54 [Enhancement](load) reduce memory by memory size of global delta writer (#14491)
4380f1ec54 is described below

commit 4380f1ec54935cac3a8b271e061942eebe8ffd9f
Author: Xin Liao <li...@126.com>
AuthorDate: Tue Jan 3 20:05:21 2023 +0800

    [Enhancement](load) reduce memory by memory size of global delta writer (#14491)
---
 be/src/olap/delta_writer.cpp        |  21 ++--
 be/src/runtime/load_channel.cpp     |  31 ------
 be/src/runtime/load_channel.h       |  42 ++++---
 be/src/runtime/load_channel_mgr.cpp | 169 +++++++++++++++-------------
 be/src/runtime/load_channel_mgr.h   |  30 ++---
 be/src/runtime/tablets_channel.cpp  | 215 ++++++++++++------------------------
 be/src/runtime/tablets_channel.h    |  34 +++---
 7 files changed, 228 insertions(+), 314 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 0d35a4b49c..22b7f568d1 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -17,14 +17,11 @@
 
 #include "olap/delta_writer.h"
 
-#include "olap/base_compaction.h"
-#include "olap/cumulative_compaction.h"
 #include "olap/data_dir.h"
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/schema.h"
-#include "olap/schema_change.h"
 #include "olap/storage_engine.h"
 #include "runtime/load_channel_mgr.h"
 #include "runtime/row_batch.h"
@@ -223,14 +220,16 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
 }
 
 Status DeltaWriter::wait_flush() {
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_is_init) {
-        // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason
-        // as described in flush_memtable_and_wait()
-        return Status::OK();
-    }
-    if (_is_cancelled) {
-        return _cancel_status;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        if (!_is_init) {
+            // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason
+            // as described in flush_memtable_and_wait()
+            return Status::OK();
+        }
+        if (_is_cancelled) {
+            return _cancel_status;
+        }
     }
     RETURN_NOT_OK(_flush_token->wait());
     return Status::OK();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 886747b5cd..de3d8d3365 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -90,18 +90,6 @@ Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channe
     return Status::OK();
 }
 
-// lock should be held when calling this method
-bool LoadChannel::_find_largest_consumption_channel(std::shared_ptr<TabletsChannel>* channel) {
-    int64_t max_consume = 0;
-    for (auto& it : _tablets_channels) {
-        if (it.second->mem_consumption() > max_consume) {
-            max_consume = it.second->mem_consumption();
-            *channel = it.second;
-        }
-    }
-    return max_consume > 0;
-}
-
 bool LoadChannel::is_finished() {
     if (!_opened) {
         return false;
@@ -118,23 +106,4 @@ Status LoadChannel::cancel() {
     return Status::OK();
 }
 
-void LoadChannel::handle_mem_exceed_limit() {
-    bool found = false;
-    std::shared_ptr<TabletsChannel> channel;
-    {
-        // lock so that only one thread can check mem limit
-        std::lock_guard<SpinLock> l(_tablets_channels_lock);
-        found = _find_largest_consumption_channel(&channel);
-    }
-    // Release lock so that other threads can still call add_batch concurrently.
-    if (found) {
-        DCHECK(channel != nullptr);
-        channel->reduce_mem_usage();
-    } else {
-        // should not happen, add log to observe
-        LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. "
-                     << "load_id=" << _load_id;
-    }
-}
-
 } // namespace doris
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index d3463c3db5..2fad48c79d 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,18 +17,17 @@
 
 #pragma once
 
+#include <functional>
+#include <map>
 #include <mutex>
 #include <ostream>
 #include <unordered_map>
 #include <unordered_set>
 
 #include "common/status.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/tablets_channel.h"
-#include "runtime/thread_context.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -59,12 +58,6 @@ public:
 
     const UniqueId& load_id() const { return _load_id; }
 
-    // check if this load channel mem consumption exceeds limit.
-    // If yes, it will pick a tablets channel to try to reduce memory consumption.
-    // The method will not return until the chosen tablet channels finished memtable
-    // flush.
-    void handle_mem_exceed_limit();
-
     int64_t mem_consumption() {
         int64_t mem_usage = 0;
         {
@@ -77,10 +70,37 @@ public:
         return mem_usage;
     }
 
+    void get_writers_mem_consumption_snapshot(
+            std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>*
+                    writers_mem_snap) {
+        std::lock_guard<SpinLock> l(_tablets_channels_lock);
+        for (auto& it : _tablets_channels) {
+            std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem;
+            it.second->get_writers_mem_consumption_snapshot(&tablets_channel_mem);
+            writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem));
+        }
+    }
+
     int64_t timeout() const { return _timeout_s; }
 
     bool is_high_priority() const { return _is_high_priority; }
 
+    void flush_memtable_async(int64_t index_id, int64_t tablet_id) {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _tablets_channels.find(index_id);
+        if (it != _tablets_channels.end()) {
+            it->second->flush_memtable_async(tablet_id);
+        }
+    }
+
+    void wait_flush(int64_t index_id, int64_t tablet_id) {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _tablets_channels.find(index_id);
+        if (it != _tablets_channels.end()) {
+            it->second->wait_flush(tablet_id);
+        }
+    }
+
 protected:
     Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, bool& is_finished,
                                 const int64_t index_id);
@@ -107,10 +127,6 @@ protected:
     }
 
 private:
-    // when mem consumption exceeds limit, should call this method to find the channel
-    // that consumes the largest memory(, and then we can reduce its memory usage).
-    bool _find_largest_consumption_channel(std::shared_ptr<TabletsChannel>* channel);
-
     UniqueId _load_id;
     // Tracks the total memory consumed by current load job on this BE
     std::unique_ptr<MemTracker> _mem_tracker;
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 15e9ea0c9e..fe4fe4250b 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -17,11 +17,15 @@
 
 #include "runtime/load_channel_mgr.h"
 
-#include "gutil/strings/substitute.h"
+#include <functional>
+#include <map>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <vector>
+
 #include "runtime/load_channel.h"
 #include "runtime/memory/mem_tracker.h"
-#include "runtime/thread_context.h"
-#include "service/backend_options.h"
 #include "util/doris_metrics.h"
 #include "util/stopwatch.hpp"
 #include "util/time.h"
@@ -71,12 +75,6 @@ LoadChannelMgr::~LoadChannelMgr() {
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
     _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
     _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100;
-    // If a load channel's memory consumption is no more than 10% of the hard limit, it's not
-    // worth to reduce memory on it. Since we only reduce 1/3 memory for one load channel,
-    // for a channel consume 10% of hard limit, we can only release about 3% memory each time,
-    // it's not quite helpfull to reduce memory pressure.
-    // In this case we need to pick multiple load channels to reduce memory more effectively.
-    _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1;
     _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
     _mem_tracker_set = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
                                                            "LoadChannelMgrTrackerSet");
@@ -221,13 +219,16 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
     int64_t process_mem_limit = MemInfo::soft_mem_limit();
+    int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache();
     if (_mem_tracker->consumption() < _load_soft_mem_limit &&
-        MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
+        proc_mem_no_allocator_cache < process_mem_limit) {
         return;
     }
     // Indicate whether current thread is reducing mem on hard limit.
     bool reducing_mem_on_hard_limit = false;
-    std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
+    // tuple<LoadChannel, index_id, tablet_id, mem_size>
+    std::vector<std::tuple<std::shared_ptr<LoadChannel>, int64_t, int64_t, int64_t>>
+            writers_to_reduce_mem;
     {
         std::unique_lock<std::mutex> l(_lock);
         while (_should_wait_flush) {
@@ -236,102 +237,117 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
             _wait_flush_cond.wait(l);
         }
         bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit ||
-                                  MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit;
+                                  proc_mem_no_allocator_cache >= process_mem_limit;
         // Some other thread is flushing data, and not reached hard limit now,
         // we don't need to handle mem limit in current thread.
         if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
             return;
         }
 
-        // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory
-        // due to soft limit, and we reached hard limit now, current thread may pick some
-        // duplicate channels and trigger duplicate reducing memory process.
-        // But the load channel's reduce memory process is thread safe, only 1 thread can
-        // reduce memory at the same time, other threads will wait on a condition variable,
-        // after the reduce-memory work finished, all threads will return.
-        using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>;
-        std::vector<ChannelMemPair> candidate_channels;
-        int64_t total_consume = 0;
+        // tuple<LoadChannel, index_id, multimap<mem size, tablet_id>>
+        using WritersMem = std::tuple<std::shared_ptr<LoadChannel>, int64_t,
+                                      std::multimap<int64_t, int64_t, std::greater<int64_t>>>;
+        std::vector<WritersMem> all_writers_mem;
+
+        // tuple<current iterator in multimap, end iterator in multimap, pos in all_writers_mem>
+        using WriterMemItem =
+                std::tuple<std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator,
+                           std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator,
+                           size_t>;
+        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
+            return std::get<0>(lhs)->first < std::get<0>(rhs)->first;
+        };
+        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)>
+                tablets_mem_heap(cmp);
+
         for (auto& kv : _load_channels) {
             if (kv.second->is_high_priority()) {
                 // do not select high priority channel to reduce memory
                 // to avoid blocking them.
                 continue;
             }
-            int64_t mem = kv.second->mem_consumption();
-            // save the mem consumption, since the calculation might be expensive.
-            candidate_channels.push_back(std::make_pair(kv.second, mem));
-            total_consume += mem;
+            std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>
+                    writers_mem_snap;
+            kv.second->get_writers_mem_consumption_snapshot(&writers_mem_snap);
+            for (auto item : writers_mem_snap) {
+                // multimap is empty
+                if (item.second.empty()) {
+                    continue;
+                }
+                all_writers_mem.emplace_back(kv.second, item.first, std::move(item.second));
+                size_t pos = all_writers_mem.size() - 1;
+                tablets_mem_heap.emplace(std::get<2>(all_writers_mem[pos]).begin(),
+                                         std::get<2>(all_writers_mem[pos]).end(), pos);
+            }
         }
 
-        if (candidate_channels.empty()) {
-            // should not happen, add log to observe
-            LOG(WARNING) << "All load channels are high priority, failed to find suitable"
-                         << "channels to reduce memory when total load mem limit exceed";
-            return;
+        // reduce 1/10 memory every time
+        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
+        int64_t mem_consumption_in_picked_writer = 0;
+        while (!tablets_mem_heap.empty()) {
+            WriterMemItem tablet_mem_item = tablets_mem_heap.top();
+            size_t pos = std::get<2>(tablet_mem_item);
+            auto load_channel = std::get<0>(all_writers_mem[pos]);
+            int64_t index_id = std::get<1>(all_writers_mem[pos]);
+            int64_t tablet_id = std::get<0>(tablet_mem_item)->second;
+            int64_t mem_size = std::get<0>(tablet_mem_item)->first;
+            writers_to_reduce_mem.emplace_back(load_channel, index_id, tablet_id, mem_size);
+            load_channel->flush_memtable_async(index_id, tablet_id);
+            mem_consumption_in_picked_writer += std::get<0>(tablet_mem_item)->first;
+            if (mem_consumption_in_picked_writer > mem_to_flushed) {
+                break;
+            }
+            tablets_mem_heap.pop();
+            if (std::get<0>(tablet_mem_item)++ != std::get<1>(tablet_mem_item)) {
+                tablets_mem_heap.push(tablet_mem_item);
+            }
         }
 
-        // sort all load channels, try to find the largest one.
-        std::sort(candidate_channels.begin(), candidate_channels.end(),
-                  [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
-                      return lhs.second > rhs.second;
-                  });
-
-        int64_t mem_consumption_in_picked_channel = 0;
-        auto largest_channel = *candidate_channels.begin();
-        // If some load-channel is big enough, we can reduce it only, try our best to avoid
-        // reducing small load channels.
-        if (_load_channel_min_mem_to_reduce > 0 &&
-            largest_channel.second > _load_channel_min_mem_to_reduce) {
-            // Pick 1 load channel to reduce memory.
-            channels_to_reduce_mem.push_back(largest_channel.first);
-            mem_consumption_in_picked_channel = largest_channel.second;
-        } else {
-            // Pick multiple channels to reduce memory.
-            int64_t mem_to_flushed = total_consume / 3;
-            for (auto ch : candidate_channels) {
-                channels_to_reduce_mem.push_back(ch.first);
-                mem_consumption_in_picked_channel += ch.second;
-                if (mem_consumption_in_picked_channel >= mem_to_flushed) {
-                    break;
-                }
-            }
+        if (writers_to_reduce_mem.empty()) {
+            // should not happen, add log to observe
+            LOG(WARNING) << "failed to find suitable writers to reduce memory"
+                         << " when total load mem limit exceed";
+            return;
         }
 
         std::ostringstream oss;
-        if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
-            oss << "reducing memory of " << channels_to_reduce_mem.size()
-                << " load channels (total mem consumption: " << mem_consumption_in_picked_channel
-                << " bytes), because total load mem consumption "
-                << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES)
-                << " has exceeded";
+        oss << "reducing memory of " << writers_to_reduce_mem.size()
+            << " delta writers (total mem: "
+            << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: "
+            << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front()))
+            << ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back()))
+            << "), ";
+        if (proc_mem_no_allocator_cache < process_mem_limit) {
+            oss << "because total load mem consumption "
+                << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded";
             if (_mem_tracker->consumption() > _load_hard_mem_limit) {
                 _should_wait_flush = true;
                 reducing_mem_on_hard_limit = true;
-                oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
+                oss << " hard limit: " << PrettyPrinter::print_bytes(_load_hard_mem_limit);
             } else {
                 _soft_reduce_mem_in_progress = true;
-                oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
+                oss << " soft limit: " << PrettyPrinter::print_bytes(_load_soft_mem_limit);
             }
         } else {
             _should_wait_flush = true;
             reducing_mem_on_hard_limit = true;
-            oss << "reducing memory of " << channels_to_reduce_mem.size()
-                << " load channels (total mem consumption: " << mem_consumption_in_picked_channel
-                << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit "
-                << PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
-                << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str();
+            oss << "because proc_mem_no_allocator_cache consumption "
+                << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
+                << ", has exceeded process limit " << PrettyPrinter::print_bytes(process_mem_limit)
+                << ", total load mem consumption: "
+                << PrettyPrinter::print_bytes(_mem_tracker->consumption())
+                << ", vm_rss: " << PerfCounters::get_vm_rss_str()
+                << ", tc/jemalloc allocator cache: " << MemInfo::allocator_cache_mem_str();
         }
         LOG(INFO) << oss.str();
     }
 
-    for (auto ch : channels_to_reduce_mem) {
-        uint64_t begin = GetCurrentTimeMicros();
-        int64_t mem_usage = ch->mem_consumption();
-        ch->handle_mem_exceed_limit();
-        LOG(INFO) << "reduced memory of " << *ch << ", cost "
-                  << (GetCurrentTimeMicros() - begin) / 1000
-                  << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes";
+    // wait all writers flush without lock
+    for (auto item : writers_to_reduce_mem) {
+        VLOG_NOTICE << "reducing memory, wait flush load_id: " << std::get<0>(item)->load_id()
+                    << ", index_id: " << std::get<1>(item) << ", tablet_id: " << std::get<2>(item)
+                    << ", mem_size: " << PrettyPrinter::print_bytes(std::get<3>(item));
+        std::get<0>(item)->wait_flush(std::get<1>(item), std::get<2>(item));
     }
 
     {
@@ -345,8 +361,9 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
         if (_soft_reduce_mem_in_progress) {
             _soft_reduce_mem_in_progress = false;
         }
+        // refresh mem tacker to avoid duplicate reduce
+        _refresh_mem_tracker_without_lock();
     }
-    return;
 }
 
 } // namespace doris
diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h
index 9bf604d87e..967617e9bc 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -24,14 +24,10 @@
 #include <unordered_map>
 
 #include "common/status.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "gutil/ref_counted.h"
 #include "olap/lru_cache.h"
 #include "runtime/load_channel.h"
-#include "runtime/tablets_channel.h"
-#include "runtime/thread_context.h"
 #include "util/countdown_latch.h"
 #include "util/thread.h"
 #include "util/uid_util.h"
@@ -59,14 +55,8 @@ public:
     Status cancel(const PTabletWriterCancelRequest& request);
 
     void refresh_mem_tracker() {
-        int64_t mem_usage = 0;
-        {
-            std::lock_guard<std::mutex> l(_lock);
-            for (auto& kv : _load_channels) {
-                mem_usage += kv.second->mem_consumption();
-            }
-        }
-        _mem_tracker->set_consumption(mem_usage);
+        std::lock_guard<std::mutex> l(_lock);
+        _refresh_mem_tracker_without_lock();
     }
     MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); }
 
@@ -82,6 +72,15 @@ private:
 
     Status _start_bg_worker();
 
+    // lock should be held when calling this method
+    void _refresh_mem_tracker_without_lock() {
+        int64_t mem_usage = 0;
+        for (auto& kv : _load_channels) {
+            mem_usage += kv.second->mem_consumption();
+        }
+        _mem_tracker->set_consumption(mem_usage);
+    }
+
 protected:
     // lock protect the load channel map
     std::mutex _lock;
@@ -95,13 +94,6 @@ protected:
     std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
     int64_t _load_hard_mem_limit = -1;
     int64_t _load_soft_mem_limit = -1;
-    // By default, we try to reduce memory on the load channel with largest mem consumption,
-    // but if there are lots of small load channel, even the largest one consumes very
-    // small memory, in this case we need to pick multiple load channels to reduce memory
-    // more effectively.
-    // `_load_channel_min_mem_to_reduce` is used to determine whether the largest load channel's
-    // memory consumption is big enough.
-    int64_t _load_channel_min_mem_to_reduce = -1;
     bool _soft_reduce_mem_in_progress = false;
 
     // If hard limit reached, one thread will trigger load channel flush,
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 483fd22edd..089b5ba4be 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -23,10 +23,7 @@
 #include "olap/storage_engine.h"
 #include "runtime/load_channel.h"
 #include "runtime/row_batch.h"
-#include "runtime/thread_context.h"
-#include "runtime/tuple_row.h"
 #include "util/doris_metrics.h"
-#include "util/time.h"
 
 namespace doris {
 
@@ -208,136 +205,16 @@ int64_t TabletsChannel::mem_consumption() {
     int64_t mem_usage = 0;
     {
         std::lock_guard<SpinLock> l(_tablet_writers_lock);
+        _mem_consumptions.clear();
         for (auto& it : _tablet_writers) {
-            mem_usage += it.second->mem_consumption();
+            int64_t writer_mem = it.second->mem_consumption();
+            mem_usage += writer_mem;
+            _mem_consumptions.emplace(writer_mem, it.first);
         }
     }
     return mem_usage;
 }
 
-void TabletsChannel::reduce_mem_usage() {
-    if (_try_to_wait_flushing()) {
-        // `_try_to_wait_flushing()` returns true means other thread already
-        // reduced the mem usage, and current thread do not need to reduce again.
-        LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id
-                  << ", index_id: " << _index_id;
-        return;
-    }
-
-    std::vector<DeltaWriter*> writers_to_wait_flush;
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        if (_state == kFinished) {
-            // TabletsChannel is closed without LoadChannel's lock,
-            // therefore it's possible for reduce_mem_usage() to be called right after close()
-            LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id
-                      << ", index_id: " << _index_id;
-            return;
-        }
-
-        // Sort the DeltaWriters by mem consumption in descend order.
-        std::vector<DeltaWriter*> writers;
-        for (auto& it : _tablet_writers) {
-            it.second->save_mem_consumption_snapshot();
-            writers.push_back(it.second);
-        }
-        int64_t total_memtable_consumption_in_flush = 0;
-        for (auto writer : writers) {
-            if (writer->get_memtable_consumption_inflush() > 0) {
-                writers_to_wait_flush.push_back(writer);
-                total_memtable_consumption_in_flush += writer->get_memtable_consumption_inflush();
-            }
-        }
-        std::sort(writers.begin(), writers.end(),
-                  [](const DeltaWriter* lhs, const DeltaWriter* rhs) {
-                      return lhs->get_memtable_consumption_snapshot() >
-                             rhs->get_memtable_consumption_snapshot();
-                  });
-
-        // Decide which writes should be flushed to reduce mem consumption.
-        // The main idea is to flush at least one third of the mem_limit.
-        // This is mainly to solve the following scenarios.
-        // Suppose there are N tablets in this TabletsChannel, and the mem limit is M.
-        // If the data is evenly distributed, when each tablet memory accumulates to M/N,
-        // the reduce memory operation will be triggered.
-        // At this time, the value of M/N may be much smaller than the value of `write_buffer_size`.
-        // If we flush all the tablets at this time, each tablet will generate a lot of small files.
-        // So here we only flush part of the tablet, and the next time the reduce memory operation is triggered,
-        // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes.
-
-        int64_t mem_to_flushed = mem_consumption() / 3;
-        if (total_memtable_consumption_in_flush < mem_to_flushed) {
-            mem_to_flushed -= total_memtable_consumption_in_flush;
-            int counter = 0;
-            int64_t sum = 0;
-            for (auto writer : writers) {
-                if (writer->mem_consumption() <= 0) {
-                    break;
-                }
-                ++counter;
-                sum += writer->mem_consumption();
-                if (sum > mem_to_flushed) {
-                    break;
-                }
-            }
-            std::ostringstream ss;
-            ss << "total size of memtables in flush: " << total_memtable_consumption_in_flush
-               << " will flush " << counter << " more memtables to reduce memory: " << sum;
-            if (counter > 0) {
-                ss << ", the size of smallest memtable to flush is "
-                   << writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes";
-            }
-            LOG(INFO) << ss.str();
-            // following loop flush memtable async, we'll do it with _lock
-            for (int i = 0; i < counter; i++) {
-                Status st = writers[i]->flush_memtable_and_wait(false);
-                if (!st.ok()) {
-                    LOG_WARNING(
-                            "tablet writer failed to reduce mem consumption by flushing memtable")
-                            .tag("tablet_id", writers[i]->tablet_id())
-                            .tag("txn_id", _txn_id)
-                            .error(st);
-                    writers[i]->cancel_with_status(st);
-                    _broken_tablets.insert(writers[i]->tablet_id());
-                }
-            }
-            for (int i = 0; i < counter; i++) {
-                if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) {
-                    // skip broken tablets
-                    continue;
-                }
-                writers_to_wait_flush.push_back(writers[i]);
-            }
-            _reducing_mem_usage = true;
-        } else {
-            LOG(INFO) << "total size of memtables in flush is big enough: "
-                      << total_memtable_consumption_in_flush
-                      << " bytes, will not flush more memtables";
-        }
-    }
-
-    for (auto writer : writers_to_wait_flush) {
-        Status st = writer->wait_flush();
-        if (!st.ok()) {
-            LOG_WARNING(
-                    "tablet writer failed to reduce mem consumption by waiting flushing memtable")
-                    .tag("tablet_id", writer->tablet_id())
-                    .tag("txn_id", _txn_id)
-                    .error(st);
-            writer->cancel_with_status(st);
-            _broken_tablets.insert(writer->tablet_id());
-        }
-    }
-
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        _reducing_mem_usage = false;
-        _reduce_memory_cond.notify_all();
-    }
-
-    return;
-}
-
 Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
@@ -387,26 +264,6 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
     return Status::OK();
 }
 
-bool TabletsChannel::_try_to_wait_flushing() {
-    bool duplicate_work = false;
-    std::unique_lock<std::mutex> l(_lock);
-    // NOTE: we call `reduce_mem_usage()` because we think it's necessary
-    // to reduce it's memory and should not write more data into this
-    // tablets channel. If there's already some other thead doing the
-    // reduce-memory work, the only choice for current thread is to wait
-    // here.
-    // If current thread do not wait, it has two options:
-    // 1. continue to write data to current channel.
-    // 2. pick another tablets channel to flush
-    // The first choice might cause OOM, the second choice might pick a
-    // channel that is not big enough.
-    while (_reducing_mem_usage) {
-        duplicate_work = true;
-        _reduce_memory_cond.wait(l);
-    }
-    return duplicate_work;
-}
-
 Status TabletsChannel::cancel() {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
@@ -500,6 +357,70 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
     return Status::OK();
 }
 
+void TabletsChannel::flush_memtable_async(int64_t tablet_id) {
+    std::lock_guard<std::mutex> l(_lock);
+    if (_state == kFinished) {
+        // TabletsChannel is closed without LoadChannel's lock,
+        // therefore it's possible for reduce_mem_usage() to be called right after close()
+        LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id
+                  << ", index_id: " << _index_id;
+        return;
+    }
+
+    auto iter = _tablet_writers.find(tablet_id);
+    if (iter == _tablet_writers.end()) {
+        return;
+    }
+
+    if (!(_reducing_tablets.insert(tablet_id).second)) {
+        return;
+    }
+
+    Status st = iter->second->flush_memtable_and_wait(false);
+    if (!st.ok()) {
+        auto err_msg = fmt::format(
+                "tablet writer failed to reduce mem consumption by flushing memtable, "
+                "tablet_id={}, txn_id={}, err={}",
+                tablet_id, _txn_id, st);
+        LOG(WARNING) << err_msg;
+        iter->second->cancel_with_status(st);
+        _broken_tablets.insert(iter->second->tablet_id());
+    }
+}
+
+void TabletsChannel::wait_flush(int64_t tablet_id) {
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        if (_state == kFinished) {
+            // TabletsChannel is closed without LoadChannel's lock,
+            // therefore it's possible for reduce_mem_usage() to be called right after close()
+            LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id
+                      << ", index_id: " << _index_id;
+            return;
+        }
+    }
+
+    auto iter = _tablet_writers.find(tablet_id);
+    if (iter == _tablet_writers.end()) {
+        return;
+    }
+    Status st = iter->second->wait_flush();
+    if (!st.ok()) {
+        auto err_msg = fmt::format(
+                "tablet writer failed to reduce mem consumption by flushing memtable, "
+                "tablet_id={}, txn_id={}, err={}",
+                tablet_id, _txn_id, st);
+        LOG(WARNING) << err_msg;
+        iter->second->cancel_with_status(st);
+        _broken_tablets.insert(iter->second->tablet_id());
+    }
+
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        _reducing_tablets.erase(tablet_id);
+    }
+}
+
 template Status
 TabletsChannel::add_batch<PTabletWriterAddBlockRequest, PTabletWriterAddBlockResult>(
         PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index c774994cb2..e649852a47 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -18,22 +18,19 @@
 #pragma once
 
 #include <cstdint>
+#include <functional>
+#include <map>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
-#include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/thread_context.h"
 #include "util/bitmap.h"
-#include "util/countdown_latch.h"
-#include "util/priority_thread_pool.hpp"
 #include "util/uid_util.h"
-#include "vec/core/block.h"
 
 namespace doris {
 
@@ -87,14 +84,17 @@ public:
     // no-op when this channel has been closed or cancelled
     Status cancel();
 
-    // upper application may call this to try to reduce the mem usage of this channel.
-    // eg. flush the largest memtable immediately.
-    // return Status::OK if mem is reduced.
-    // no-op when this channel has been closed or cancelled
-    void reduce_mem_usage();
-
     int64_t mem_consumption();
 
+    void get_writers_mem_consumption_snapshot(
+            std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
+        std::lock_guard<SpinLock> l(_tablet_writers_lock);
+        *mem_consumptions = _mem_consumptions;
+    }
+
+    void flush_memtable_async(int64_t tablet_id);
+    void wait_flush(int64_t tablet_id);
+
 private:
     template <typename Request>
     Status _get_current_seq(int64_t& cur_seq, const Request& request);
@@ -151,11 +151,7 @@ private:
     // So that following batch will not handle this tablet anymore.
     std::unordered_set<int64_t> _broken_tablets;
 
-    bool _reducing_mem_usage = false;
-    // only one thread can reduce memory for one TabletsChannel.
-    // if some other thread call `reduce_memory_usage` at the same time,
-    // it will wait on this condition variable.
-    std::condition_variable _reduce_memory_cond;
+    std::unordered_set<int64_t> _reducing_tablets;
 
     std::unordered_set<int64_t> _partition_ids;
 
@@ -164,6 +160,10 @@ private:
     bool _is_high_priority = false;
 
     bool _write_single_replica = false;
+
+    // mem -> tablet_id
+    // sort by memory size
+    std::multimap<int64_t, int64_t, std::greater<int64_t>> _mem_consumptions;
 };
 
 template <typename Request>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org