You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zo...@apache.org on 2023/06/19 10:34:51 UTC

[doris] branch master updated: [improvement](load) avoid producing small segment (#20852)

This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a3342cb088 [improvement](load) avoid producing small segment (#20852)
a3342cb088 is described below

commit a3342cb088702e60bfd0fba83dca8d2e5cf65990
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Mon Jun 19 18:34:44 2023 +0800

    [improvement](load) avoid producing small segment (#20852)
    
    avoid producing small segment
---
 be/src/olap/delta_writer.cpp        | 38 +++++++++++++++++--------------------
 be/src/olap/delta_writer.h          | 14 +-------------
 be/src/runtime/load_channel.h       |  4 ++--
 be/src/runtime/load_channel_mgr.cpp |  7 ++++---
 be/src/runtime/tablets_channel.cpp  | 10 ++++++++++
 be/src/runtime/tablets_channel.h    |  7 ++-----
 6 files changed, 36 insertions(+), 44 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index df9167484b..5c0d7dc2d2 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -529,27 +529,6 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
     return Status::OK();
 }
 
-void DeltaWriter::save_mem_consumption_snapshot() {
-    std::lock_guard<std::mutex> l(_lock);
-    _mem_consumption_snapshot = mem_consumption(MemType::ALL);
-    if (_mem_table == nullptr) {
-        _memtable_consumption_snapshot = 0;
-    } else {
-        _memtable_consumption_snapshot = _mem_table->memory_usage();
-    }
-}
-
-int64_t DeltaWriter::get_memtable_consumption_inflush() const {
-    if (!_is_init || _flush_token->get_stats().flush_running_count == 0) {
-        return 0;
-    }
-    return _mem_consumption_snapshot - _memtable_consumption_snapshot;
-}
-
-int64_t DeltaWriter::get_memtable_consumption_snapshot() const {
-    return _memtable_consumption_snapshot;
-}
-
 int64_t DeltaWriter::mem_consumption(MemType mem) {
     if (_flush_token == nullptr) {
         // This method may be called before this writer is initialized.
@@ -573,6 +552,23 @@ int64_t DeltaWriter::mem_consumption(MemType mem) {
     return mem_usage;
 }
 
+int64_t DeltaWriter::active_memtable_mem_consumption() {
+    if (_flush_token == nullptr) {
+        // This method may be called before this writer is initialized.
+        // So _flush_token may be null.
+        return 0;
+    }
+    int64_t mem_usage = 0;
+    {
+        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+        if (_mem_table_insert_trackers.size() > 0) {
+            mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption();
+            mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption();
+        }
+    }
+    return mem_usage;
+}
+
 int64_t DeltaWriter::partition_id() const {
     return _req.partition_id;
 }
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index b91144751c..52b407876f 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -116,6 +116,7 @@ public:
     int64_t partition_id() const;
 
     int64_t mem_consumption(MemType mem);
+    int64_t active_memtable_mem_consumption();
 
     // Wait all memtable in flush queue to be flushed
     Status wait_flush();
@@ -124,12 +125,6 @@ public:
 
     int32_t schema_hash() { return _tablet->schema_hash(); }
 
-    void save_mem_consumption_snapshot();
-
-    int64_t get_memtable_consumption_inflush() const;
-
-    int64_t get_memtable_consumption_snapshot() const;
-
     void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
 
     int64_t total_received_rows() const { return _total_received_rows; }
@@ -181,13 +176,6 @@ private:
 
     std::mutex _lock;
 
-    // memory consumption snapshot for current delta_writer, only
-    // used for std::sort
-    int64_t _mem_consumption_snapshot = 0;
-    // memory consumption snapshot for current memtable, only
-    // used for std::sort
-    int64_t _memtable_consumption_snapshot = 0;
-
     std::unordered_set<int64_t> _unfinished_slave_node;
     PSuccessSlaveTabletNodeIds _success_slave_node_ids;
     std::shared_mutex _slave_node_lock;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index e746014fde..5648cb0ae1 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -87,13 +87,13 @@ public:
         return mem_usage;
     }
 
-    void get_writers_mem_consumption_snapshot(
+    void get_active_memtable_mem_consumption(
             std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>*
                     writers_mem_snap) {
         std::lock_guard<SpinLock> l(_tablets_channels_lock);
         for (auto& it : _tablets_channels) {
             std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem;
-            it.second->get_writers_mem_consumption_snapshot(&tablets_channel_mem);
+            it.second->get_active_memtable_mem_consumption(&tablets_channel_mem);
             writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem));
         }
     }
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 1a9fd86f2d..a9d94aedb9 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -360,12 +360,12 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
         for (auto& kv : _load_channels) {
             if (kv.second->is_high_priority()) {
                 // do not select high priority channel to reduce memory
-                // to avoid blocking them.
+                // to avoid blocking the.
                 continue;
             }
             std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>
                     writers_mem_snap;
-            kv.second->get_writers_mem_consumption_snapshot(&writers_mem_snap);
+            kv.second->get_active_memtable_mem_consumption(&writers_mem_snap);
             for (auto item : writers_mem_snap) {
                 // multimap is empty
                 if (item.second.empty()) {
@@ -412,8 +412,9 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
             << " delta writers (total mem: "
             << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: "
             << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front()))
+            << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.front())
             << ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back()))
-            << "), ";
+            << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.back()) << "), ";
         if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
             oss << "because total load mem consumption "
                 << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded";
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index b7ac5dcd64..05b17c5931 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -282,6 +282,16 @@ int64_t TabletsChannel::mem_consumption() {
     return write_mem_usage + flush_mem_usage;
 }
 
+void TabletsChannel::get_active_memtable_mem_consumption(
+        std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
+    mem_consumptions->clear();
+    std::lock_guard<SpinLock> l(_tablet_writers_lock);
+    for (auto& it : _tablet_writers) {
+        int64_t active_memtable_mem = it.second->active_memtable_mem_consumption();
+        mem_consumptions->emplace(active_memtable_mem, it.first);
+    }
+}
+
 // Old logic,used for opening all writers of all partitions.
 Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 58cfd961ff..b9235925b0 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -115,11 +115,8 @@ public:
 
     int64_t mem_consumption();
 
-    void get_writers_mem_consumption_snapshot(
-            std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
-        std::lock_guard<SpinLock> l(_tablet_writers_lock);
-        *mem_consumptions = _mem_consumptions;
-    }
+    void get_active_memtable_mem_consumption(
+            std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions);
 
     void flush_memtable_async(int64_t tablet_id);
     void wait_flush(int64_t tablet_id);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org