You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/21 00:35:42 UTC

[doris] branch master updated: [Enhancement](load) consider memtable in flush while reducing load me… (#13480)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b0dafcaa1 [Enhancement](load) consider memtable in flush while reducing load me… (#13480)
1b0dafcaa1 is described below

commit 1b0dafcaa181d985afa119387c4c15d689a2e4d5
Author: zhannngchen <48...@users.noreply.github.com>
AuthorDate: Fri Oct 21 08:35:35 2022 +0800

    [Enhancement](load) consider memtable in flush while reducing load me… (#13480)
    
    We should consider memory which are being flushed from memtable to disk when trying to reduce memory by flushing memtable. Otherwise, we might not release memory space as expected. (e.g. lots of large memtable is in flush, the reduce_mem_usage method picks some small memtables to flush, it can't release enough memory and also can generate lots of small segments, which can cause -238 error)
---
 be/src/olap/delta_writer.cpp       |  9 +++-
 be/src/olap/delta_writer.h         |  7 ++-
 be/src/runtime/load_channel.h      |  3 --
 be/src/runtime/tablets_channel.cpp | 97 +++++++++++++++++++++++---------------
 4 files changed, 73 insertions(+), 43 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index bbb9074514..e7ade20214 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -383,9 +383,14 @@ Status DeltaWriter::cancel() {
     return Status::OK();
 }
 
-int64_t DeltaWriter::save_memtable_consumption_snapshot() {
+void DeltaWriter::save_mem_consumption_snapshot() {
+    _mem_consumption_snapshot = mem_consumption();
     _memtable_consumption_snapshot = memtable_consumption();
-    return _memtable_consumption_snapshot;
+}
+
+int64_t DeltaWriter::get_memtable_consumption_inflush() const {
+    if (_flush_token->get_stats().flush_running_count == 0) return 0;
+    return _mem_consumption_snapshot - _memtable_consumption_snapshot;
 }
 
 int64_t DeltaWriter::get_memtable_consumption_snapshot() const {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 4ad7df38ed..a7770326fd 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -104,7 +104,9 @@ public:
 
     int64_t memtable_consumption() const;
 
-    int64_t save_memtable_consumption_snapshot();
+    void save_mem_consumption_snapshot();
+
+    int64_t get_memtable_consumption_inflush() const;
 
     int64_t get_memtable_consumption_snapshot() const;
 
@@ -161,6 +163,9 @@ private:
     // use in vectorized load
     bool _is_vec;
 
+    // memory consumption snapshot for current delta_writer, only
+    // used for std::sort
+    int64_t _mem_consumption_snapshot = 0;
     // memory consumption snapshot for current memtable, only
     // used for std::sort
     int64_t _memtable_consumption_snapshot = 0;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index da2b43ceaa..a75639e433 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -177,9 +177,6 @@ Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) {
     {
         // lock so that only one thread can check mem limit
         std::lock_guard<std::mutex> l(_lock);
-
-        LOG(INFO) << "reducing memory of " << *this
-                  << " ,mem consumption: " << _mem_tracker->consumption();
         found = _find_largest_consumption_channel(&channel);
     }
     // Release lock so that other threads can still call add_batch concurrently.
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index fa72d23ad0..d833b5d887 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -199,10 +199,12 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
     if (_try_to_wait_flushing()) {
         // `_try_to_wait_flushing()` returns true means other thread already
         // reduced the mem usage, and current thread do not need to reduce again.
+        LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id
+                  << ", index_id: " << _index_id;
         return Status::OK();
     }
 
-    std::vector<DeltaWriter*> writers_to_flush;
+    std::vector<DeltaWriter*> writers_to_wait_flush;
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_state == kFinished) {
@@ -214,9 +216,16 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
         // Sort the DeltaWriters by mem consumption in descend order.
         std::vector<DeltaWriter*> writers;
         for (auto& it : _tablet_writers) {
-            it.second->save_memtable_consumption_snapshot();
+            it.second->save_mem_consumption_snapshot();
             writers.push_back(it.second);
         }
+        int64_t total_memtable_consumption_in_flush = 0;
+        for (auto writer : writers) {
+            if (writer->get_memtable_consumption_inflush() > 0) {
+                writers_to_wait_flush.push_back(writer);
+                total_memtable_consumption_in_flush += writer->get_memtable_consumption_inflush();
+            }
+        }
         std::sort(writers.begin(), writers.end(),
                   [](const DeltaWriter* lhs, const DeltaWriter* rhs) {
                       return lhs->get_memtable_consumption_snapshot() >
@@ -235,48 +244,62 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
         // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes.
 
         int64_t mem_to_flushed = _mem_tracker->consumption() / 3;
-        int counter = 0;
-        int64_t sum = 0;
-        for (auto writer : writers) {
-            if (writer->mem_consumption() <= 0) {
-                break;
+        if (total_memtable_consumption_in_flush < mem_to_flushed) {
+            mem_to_flushed -= total_memtable_consumption_in_flush;
+            int counter = 0;
+            int64_t sum = 0;
+            for (auto writer : writers) {
+                if (writer->mem_consumption() <= 0) {
+                    break;
+                }
+                ++counter;
+                sum += writer->mem_consumption();
+                if (sum > mem_to_flushed) {
+                    break;
+                }
             }
-            ++counter;
-            sum += writer->mem_consumption();
-            if (sum > mem_to_flushed) {
-                break;
+            std::ostringstream ss;
+            ss << "total size of memtables in flush: " << total_memtable_consumption_in_flush
+               << " will flush " << counter << " more memtables to reduce memory: " << sum;
+            if (counter > 0) {
+                ss << ", the size of smallest memtable to flush is "
+                   << writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes";
             }
-        }
-        VLOG_CRITICAL << "flush " << counter << " memtables to reduce memory: " << sum;
-        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
-                response->mutable_tablet_errors();
-        // following loop flush memtable async, we'll do it with _lock
-        for (int i = 0; i < counter; i++) {
-            Status st = writers[i]->flush_memtable_and_wait(false);
-            if (!st.ok()) {
-                auto err_msg = strings::Substitute(
-                        "tablet writer failed to reduce mem consumption by flushing memtable, "
-                        "tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4",
-                        writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(),
-                        st.get_error_msg());
-                LOG(WARNING) << err_msg;
-                PTabletError* error = tablet_errors->Add();
-                error->set_tablet_id(writers[i]->tablet_id());
-                error->set_msg(err_msg);
-                _broken_tablets.insert(writers[i]->tablet_id());
+            LOG(INFO) << ss.str();
+            google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
+                    response->mutable_tablet_errors();
+            // following loop flush memtable async, we'll do it with _lock
+            for (int i = 0; i < counter; i++) {
+                Status st = writers[i]->flush_memtable_and_wait(false);
+                if (!st.ok()) {
+                    auto err_msg = strings::Substitute(
+                            "tablet writer failed to reduce mem consumption by flushing memtable, "
+                            "tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4",
+                            writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(),
+                            st.get_error_msg());
+                    LOG(WARNING) << err_msg;
+                    PTabletError* error = tablet_errors->Add();
+                    error->set_tablet_id(writers[i]->tablet_id());
+                    error->set_msg(err_msg);
+                    _broken_tablets.insert(writers[i]->tablet_id());
+                }
             }
-        }
-        for (int i = 0; i < counter; i++) {
-            if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) {
-                // skip broken tablets
-                continue;
+            for (int i = 0; i < counter; i++) {
+                if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) {
+                    // skip broken tablets
+                    continue;
+                }
+                writers_to_wait_flush.push_back(writers[i]);
             }
-            writers_to_flush.push_back(writers[i]);
+            _reducing_mem_usage = true;
+        } else {
+            LOG(INFO) << "total size of memtables in flush is big enough: "
+                      << total_memtable_consumption_in_flush
+                      << " bytes, will not flush more memtables";
         }
-        _reducing_mem_usage = true;
     }
 
-    for (auto writer : writers_to_flush) {
+    for (auto writer : writers_to_wait_flush) {
         Status st = writer->wait_flush();
         if (!st.ok()) {
             return Status::InternalError(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org