You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/18 06:31:41 UTC

[doris] branch dev-1.1.1 updated: [bugfix 1.1.1]fix_physical_mem_limit (#10967)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
     new d33dd7dc18 [bugfix 1.1.1]fix_physical_mem_limit (#10967)
d33dd7dc18 is described below

commit d33dd7dc18ce19549e24a7db4c6dcba297137dc4
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Mon Jul 18 14:31:36 2022 +0800

    [bugfix 1.1.1]fix_physical_mem_limit (#10967)
---
 be/src/runtime/mem_tracker.h  |  7 +++-
 be/src/service/doris_main.cpp |  4 +-
 be/src/util/CMakeLists.txt    |  2 +-
 be/src/util/mem_info.h        |  2 +
 be/src/util/perf_counters.cpp | 97 ++++++++++++++++++++++++++++++++++++-------
 be/src/util/perf_counters.h   | 22 ++++++++++
 6 files changed, 117 insertions(+), 17 deletions(-)

diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 1622a70e71..9166462aba 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -32,6 +32,7 @@
 #include "gen_cpp/Types_types.h" // for TUniqueId
 #include "util/mem_info.h"
 #include "util/metrics.h"
+#include "util/perf_counters.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
 
@@ -173,7 +174,11 @@ public:
             Release(-bytes);
             return Status::OK();
         }
-        if (MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
+        // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
+        // This is independent of the consumption value of the mem tracker, which counts the virtual memory
+        // of the process malloc.
+        // for fast, expect MemInfo::initialized() to be true.
+        if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) {
             return Status::MemoryLimitExceeded(fmt::format(
                     "{}: TryConsume failed, bytes={} process whole consumption={}  mem limit={}",
                     label_, bytes, MemInfo::current_mem(), MemInfo::mem_limit()));
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index d8c4695b25..59fbdc8330 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -57,6 +57,7 @@
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "util/logging.h"
+#include "util/perf_counters.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/thrift_server.h"
 #include "util/uid_util.h"
@@ -475,7 +476,8 @@ int main(int argc, char** argv) {
 #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
         doris::MemInfo::refresh_current_mem();
 #endif
-        sleep(10);
+        doris::PerfCounters::refresh_proc_status();
+        sleep(1);
     }
 
     http_service.stop();
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 0582c57b80..59bf916ee2 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -47,7 +47,7 @@ set(UTIL_FILES
   parse_util.cpp
   path_builder.cpp
 # TODO: not supported on RHEL 5
-# perf-counters.cpp
+  perf_counters.cpp
   progress_updater.cpp
   runtime_profile.cpp
   static_asserts.cpp
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 6ae8669f86..53972ee925 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -42,6 +42,8 @@ public:
 
     static inline size_t current_mem() { return _s_current_mem; }
 
+    // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
+    // obtained by the process malloc, not the physical memory actually used by the process in the OS.
     static inline void refresh_current_mem() {
         MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
                                                         &_s_current_mem);
diff --git a/be/src/util/perf_counters.cpp b/be/src/util/perf_counters.cpp
index 5b7171d065..fc62255f9d 100644
--- a/be/src/util/perf_counters.cpp
+++ b/be/src/util/perf_counters.cpp
@@ -23,12 +23,17 @@
 #include <string.h>
 #include <sys/syscall.h>
 
+#include <boost/algorithm/string/trim.hpp>
 #include <fstream>
 #include <iomanip>
 #include <iostream>
 #include <sstream>
 
+#include "gutil/strings/substitute.h"
 #include "util/debug_util.h"
+#include "util/pretty_printer.h"
+#include "util/string_parser.hpp"
+#include "util/string_util.h"
 
 namespace doris {
 
@@ -36,6 +41,8 @@ namespace doris {
 #define BUFFER_SIZE 256
 #define PRETTY_PRINT_WIDTH 13
 
+static std::unordered_map<std::string, std::string> _process_state;
+
 // This is the order of the counters in /proc/self/io
 enum PERF_IO_IDX {
     PROC_IO_READ = 0,
@@ -123,7 +130,7 @@ static bool init_event_attr(perf_event_attr* attr, PerfCounters::Counter counter
     return true;
 }
 
-static string get_counter_name(PerfCounters::Counter counter) {
+static std::string get_counter_name(PerfCounters::Counter counter) {
     switch (counter) {
     case PerfCounters::PERF_COUNTER_SW_CPU_CLOCK:
         return "CPUTime";
@@ -275,7 +282,7 @@ bool PerfCounters::init_proc_self_status_counter(Counter counter) {
     return true;
 }
 
-bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_sys_counters(std::vector<int64_t>& buffer) {
     for (int i = 0; i < _counters.size(); i++) {
         if (_counters[i].source == SYS_PERF_COUNTER) {
             int num_bytes = read(_counters[i].fd, &buffer[i], COUNTER_SIZE);
@@ -303,7 +310,7 @@ bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) {
 //    read_bytes: 0
 //    write_bytes: 0
 //    cancelled_write_bytes: 0
-bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_io_counters(std::vector<int64_t>& buffer) {
     std::ifstream file("/proc/self/io", std::ios::in);
     std::string buf;
     int64_t values[PROC_IO_LAST_COUNTER];
@@ -343,9 +350,9 @@ bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
     return true;
 }
 
-bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_status_counters(std::vector<int64_t>& buffer) {
     std::ifstream file("/proc/self/status", std::ios::in);
-    string buf;
+    std::string buf;
 
     while (file) {
         getline(file, buf);
@@ -354,13 +361,13 @@ bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
             if (_counters[i].source == PROC_SELF_STATUS) {
                 size_t field = buf.find(_counters[i].proc_status_field);
 
-                if (field == string::npos) {
+                if (field == std::string::npos) {
                     continue;
                 }
 
                 size_t colon = field + _counters[i].proc_status_field.size() + 1;
                 buf = buf.substr(colon + 1);
-                istringstream stream(buf);
+                std::istringstream stream(buf);
                 int64_t value;
                 stream >> value;
                 buffer[i] = value * 1024; // values in file are in kb
@@ -455,12 +462,12 @@ bool PerfCounters::add_counter(Counter counter) {
 }
 
 // Query all the counters right now and store the values in results
-void PerfCounters::snapshot(const string& name) {
+void PerfCounters::snapshot(const std::string& name) {
     if (_counters.size() == 0) {
         return;
     }
 
-    string fixed_name = name;
+    std::string fixed_name = name;
 
     if (fixed_name.size() == 0) {
         std::stringstream ss;
@@ -486,22 +493,22 @@ const std::vector<int64_t>* PerfCounters::counters(int snapshot) const {
     return &_snapshots[snapshot];
 }
 
-void PerfCounters::pretty_print(ostream* s) const {
+void PerfCounters::pretty_print(std::ostream* s) const {
     std::ostream& stream = *s;
-    std::stream << setw(8) << "snapshot";
+    stream << std::setw(8) << "snapshot";
 
     for (int i = 0; i < _counter_names.size(); ++i) {
-        stream << setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
+        stream << std::setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
     }
 
     stream << std::endl;
 
     for (int s = 0; s < _snapshots.size(); s++) {
-        stream << setw(8) << _snapshot_names[s];
+        stream << std::setw(8) << _snapshot_names[s];
         const std::vector<int64_t>& snapshot = _snapshots[s];
 
         for (int i = 0; i < snapshot.size(); ++i) {
-            stream << setw(PRETTY_PRINT_WIDTH)
+            stream << std::setw(PRETTY_PRINT_WIDTH)
                    << PrettyPrinter::print(snapshot[i], _counters[i].type);
         }
 
@@ -511,4 +518,66 @@ void PerfCounters::pretty_print(ostream* s) const {
     stream << std::endl;
 }
 
+// Refactor below
+
+int PerfCounters::parse_int(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) return atoi(it->second.c_str());
+    return -1;
+}
+
+int64_t PerfCounters::parse_int64(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) {
+        StringParser::ParseResult result;
+        int64_t state_value =
+                StringParser::string_to_int<int64_t>(it->second.data(), it->second.size(), &result);
+        if (result == StringParser::PARSE_SUCCESS) return state_value;
+    }
+    return -1;
+}
+
+string PerfCounters::parse_string(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) return it->second;
+    return string();
+}
+
+int64_t PerfCounters::parse_bytes(const string& state_key) {
+    auto it = _process_state.find(state_key);
+    if (it != _process_state.end()) {
+        vector<string> fields = split(it->second, " ");
+        // We expect state_value such as, e.g., '16129508', '16129508 kB', '16129508 mB'
+        StringParser::ParseResult result;
+        int64_t state_value =
+                StringParser::string_to_int<int64_t>(fields[0].data(), fields[0].size(), &result);
+        if (result == StringParser::PARSE_SUCCESS) {
+            if (fields.size() < 2) return state_value;
+            if (fields[1].compare("kB") == 0) return state_value * 1024L;
+        }
+    }
+    return -1;
+}
+
+void PerfCounters::refresh_proc_status() {
+    std::ifstream statusinfo("/proc/self/status", std::ios::in);
+    std::string line;
+    while (statusinfo.good() && !statusinfo.eof()) {
+        getline(statusinfo, line);
+        std::vector<std::string> fields = split(line, "\t");
+        if (fields.size() < 2) continue;
+        boost::algorithm::trim(fields[1]);
+        std::string key = fields[0].substr(0, fields[0].size() - 1);
+        _process_state[strings::Substitute("status/$0", key)] = fields[1];
+    }
+
+    if (statusinfo.is_open()) statusinfo.close();
+}
+
+void PerfCounters::get_proc_status(ProcStatus* out) {
+    out->vm_size = parse_bytes("status/VmSize");
+    out->vm_peak = parse_bytes("status/VmPeak");
+    out->vm_rss = parse_bytes("status/VmRSS");
+}
+
 } // namespace doris
diff --git a/be/src/util/perf_counters.h b/be/src/util/perf_counters.h
index aa4c4bf358..ca130e511c 100644
--- a/be/src/util/perf_counters.h
+++ b/be/src/util/perf_counters.h
@@ -39,6 +39,8 @@
 //  <do your work>
 //  counters.snapshot("After Work");
 //  counters.PrettyPrint(cout);
+//
+// TODO: Expect PerfCounters to be refactored to ProcessState.
 
 namespace doris {
 
@@ -93,6 +95,26 @@ public:
     PerfCounters();
     ~PerfCounters();
 
+    // Refactor
+
+    struct ProcStatus {
+        int64_t vm_size = 0;
+        int64_t vm_peak = 0;
+        int64_t vm_rss = 0;
+    };
+
+    static int parse_int(const std::string& state_key);
+    static int64_t parse_int64(const std::string& state_key);
+    static std::string parse_string(const std::string& state_key);
+    // Original data's unit is B or KB.
+    static int64_t parse_bytes(const std::string& state_key);
+
+    // Flush cached process status info from `/proc/self/status`.
+    static void refresh_proc_status();
+    static void get_proc_status(ProcStatus* out);
+    // Return the process actual physical memory in bytes.
+    static inline int64_t get_vm_rss() { return parse_bytes("status/VmRSS"); }
+
 private:
     // Copy constructor and assignment not allowed
     PerfCounters(const PerfCounters&);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org