You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/18 06:31:41 UTC
[doris] branch dev-1.1.1 updated: [bugfix 1.1.1]fix_physical_mem_limit (#10967)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new d33dd7dc18 [bugfix 1.1.1]fix_physical_mem_limit (#10967)
d33dd7dc18 is described below
commit d33dd7dc18ce19549e24a7db4c6dcba297137dc4
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Mon Jul 18 14:31:36 2022 +0800
[bugfix 1.1.1]fix_physical_mem_limit (#10967)
---
be/src/runtime/mem_tracker.h | 7 +++-
be/src/service/doris_main.cpp | 4 +-
be/src/util/CMakeLists.txt | 2 +-
be/src/util/mem_info.h | 2 +
be/src/util/perf_counters.cpp | 97 ++++++++++++++++++++++++++++++++++++-------
be/src/util/perf_counters.h | 22 ++++++++++
6 files changed, 117 insertions(+), 17 deletions(-)
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 1622a70e71..9166462aba 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -32,6 +32,7 @@
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "util/mem_info.h"
#include "util/metrics.h"
+#include "util/perf_counters.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
@@ -173,7 +174,11 @@ public:
Release(-bytes);
return Status::OK();
}
- if (MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
+ // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
+ // This is independent of the consumption value of the mem tracker, which counts the virtual memory
+ // of the process malloc.
+ // for fast, expect MemInfo::initialized() to be true.
+ if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) {
return Status::MemoryLimitExceeded(fmt::format(
"{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}",
label_, bytes, MemInfo::current_mem(), MemInfo::mem_limit()));
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index d8c4695b25..59fbdc8330 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -57,6 +57,7 @@
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/logging.h"
+#include "util/perf_counters.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_server.h"
#include "util/uid_util.h"
@@ -475,7 +476,8 @@ int main(int argc, char** argv) {
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_current_mem();
#endif
- sleep(10);
+ doris::PerfCounters::refresh_proc_status();
+ sleep(1);
}
http_service.stop();
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 0582c57b80..59bf916ee2 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -47,7 +47,7 @@ set(UTIL_FILES
parse_util.cpp
path_builder.cpp
# TODO: not supported on RHEL 5
-# perf-counters.cpp
+ perf_counters.cpp
progress_updater.cpp
runtime_profile.cpp
static_asserts.cpp
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 6ae8669f86..53972ee925 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -42,6 +42,8 @@ public:
static inline size_t current_mem() { return _s_current_mem; }
+ // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
+ // obtained by the process malloc, not the physical memory actually used by the process in the OS.
static inline void refresh_current_mem() {
MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
&_s_current_mem);
diff --git a/be/src/util/perf_counters.cpp b/be/src/util/perf_counters.cpp
index 5b7171d065..fc62255f9d 100644
--- a/be/src/util/perf_counters.cpp
+++ b/be/src/util/perf_counters.cpp
@@ -23,12 +23,17 @@
#include <string.h>
#include <sys/syscall.h>
+#include <boost/algorithm/string/trim.hpp>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <sstream>
+#include "gutil/strings/substitute.h"
#include "util/debug_util.h"
+#include "util/pretty_printer.h"
+#include "util/string_parser.hpp"
+#include "util/string_util.h"
namespace doris {
@@ -36,6 +41,8 @@ namespace doris {
#define BUFFER_SIZE 256
#define PRETTY_PRINT_WIDTH 13
+static std::unordered_map<std::string, std::string> _process_state;
+
// This is the order of the counters in /proc/self/io
enum PERF_IO_IDX {
PROC_IO_READ = 0,
@@ -123,7 +130,7 @@ static bool init_event_attr(perf_event_attr* attr, PerfCounters::Counter counter
return true;
}
-static string get_counter_name(PerfCounters::Counter counter) {
+static std::string get_counter_name(PerfCounters::Counter counter) {
switch (counter) {
case PerfCounters::PERF_COUNTER_SW_CPU_CLOCK:
return "CPUTime";
@@ -275,7 +282,7 @@ bool PerfCounters::init_proc_self_status_counter(Counter counter) {
return true;
}
-bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_sys_counters(std::vector<int64_t>& buffer) {
for (int i = 0; i < _counters.size(); i++) {
if (_counters[i].source == SYS_PERF_COUNTER) {
int num_bytes = read(_counters[i].fd, &buffer[i], COUNTER_SIZE);
@@ -303,7 +310,7 @@ bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) {
// read_bytes: 0
// write_bytes: 0
// cancelled_write_bytes: 0
-bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_io_counters(std::vector<int64_t>& buffer) {
std::ifstream file("/proc/self/io", std::ios::in);
std::string buf;
int64_t values[PROC_IO_LAST_COUNTER];
@@ -343,9 +350,9 @@ bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
return true;
}
-bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_status_counters(std::vector<int64_t>& buffer) {
std::ifstream file("/proc/self/status", std::ios::in);
- string buf;
+ std::string buf;
while (file) {
getline(file, buf);
@@ -354,13 +361,13 @@ bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
if (_counters[i].source == PROC_SELF_STATUS) {
size_t field = buf.find(_counters[i].proc_status_field);
- if (field == string::npos) {
+ if (field == std::string::npos) {
continue;
}
size_t colon = field + _counters[i].proc_status_field.size() + 1;
buf = buf.substr(colon + 1);
- istringstream stream(buf);
+ std::istringstream stream(buf);
int64_t value;
stream >> value;
buffer[i] = value * 1024; // values in file are in kb
@@ -455,12 +462,12 @@ bool PerfCounters::add_counter(Counter counter) {
}
// Query all the counters right now and store the values in results
-void PerfCounters::snapshot(const string& name) {
+void PerfCounters::snapshot(const std::string& name) {
if (_counters.size() == 0) {
return;
}
- string fixed_name = name;
+ std::string fixed_name = name;
if (fixed_name.size() == 0) {
std::stringstream ss;
@@ -486,22 +493,22 @@ const std::vector<int64_t>* PerfCounters::counters(int snapshot) const {
return &_snapshots[snapshot];
}
-void PerfCounters::pretty_print(ostream* s) const {
+void PerfCounters::pretty_print(std::ostream* s) const {
std::ostream& stream = *s;
- std::stream << setw(8) << "snapshot";
+ stream << std::setw(8) << "snapshot";
for (int i = 0; i < _counter_names.size(); ++i) {
- stream << setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
+ stream << std::setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
}
stream << std::endl;
for (int s = 0; s < _snapshots.size(); s++) {
- stream << setw(8) << _snapshot_names[s];
+ stream << std::setw(8) << _snapshot_names[s];
const std::vector<int64_t>& snapshot = _snapshots[s];
for (int i = 0; i < snapshot.size(); ++i) {
- stream << setw(PRETTY_PRINT_WIDTH)
+ stream << std::setw(PRETTY_PRINT_WIDTH)
<< PrettyPrinter::print(snapshot[i], _counters[i].type);
}
@@ -511,4 +518,66 @@ void PerfCounters::pretty_print(ostream* s) const {
stream << std::endl;
}
+// Refactor below
+
+int PerfCounters::parse_int(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) return atoi(it->second.c_str());
+ return -1;
+}
+
+int64_t PerfCounters::parse_int64(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) {
+ StringParser::ParseResult result;
+ int64_t state_value =
+ StringParser::string_to_int<int64_t>(it->second.data(), it->second.size(), &result);
+ if (result == StringParser::PARSE_SUCCESS) return state_value;
+ }
+ return -1;
+}
+
+string PerfCounters::parse_string(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) return it->second;
+ return string();
+}
+
+int64_t PerfCounters::parse_bytes(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) {
+ vector<string> fields = split(it->second, " ");
+ // We expect state_value such as, e.g., '16129508', '16129508 kB', '16129508 mB'
+ StringParser::ParseResult result;
+ int64_t state_value =
+ StringParser::string_to_int<int64_t>(fields[0].data(), fields[0].size(), &result);
+ if (result == StringParser::PARSE_SUCCESS) {
+ if (fields.size() < 2) return state_value;
+ if (fields[1].compare("kB") == 0) return state_value * 1024L;
+ }
+ }
+ return -1;
+}
+
+void PerfCounters::refresh_proc_status() {
+ std::ifstream statusinfo("/proc/self/status", std::ios::in);
+ std::string line;
+ while (statusinfo.good() && !statusinfo.eof()) {
+ getline(statusinfo, line);
+ std::vector<std::string> fields = split(line, "\t");
+ if (fields.size() < 2) continue;
+ boost::algorithm::trim(fields[1]);
+ std::string key = fields[0].substr(0, fields[0].size() - 1);
+ _process_state[strings::Substitute("status/$0", key)] = fields[1];
+ }
+
+ if (statusinfo.is_open()) statusinfo.close();
+}
+
+void PerfCounters::get_proc_status(ProcStatus* out) {
+ out->vm_size = parse_bytes("status/VmSize");
+ out->vm_peak = parse_bytes("status/VmPeak");
+ out->vm_rss = parse_bytes("status/VmRSS");
+}
+
} // namespace doris
diff --git a/be/src/util/perf_counters.h b/be/src/util/perf_counters.h
index aa4c4bf358..ca130e511c 100644
--- a/be/src/util/perf_counters.h
+++ b/be/src/util/perf_counters.h
@@ -39,6 +39,8 @@
// <do your work>
// counters.snapshot("After Work");
// counters.PrettyPrint(cout);
+//
+// TODO: Expect PerfCounters to be refactored to ProcessState.
namespace doris {
@@ -93,6 +95,26 @@ public:
PerfCounters();
~PerfCounters();
+ // Refactor
+
+ struct ProcStatus {
+ int64_t vm_size = 0;
+ int64_t vm_peak = 0;
+ int64_t vm_rss = 0;
+ };
+
+ static int parse_int(const std::string& state_key);
+ static int64_t parse_int64(const std::string& state_key);
+ static std::string parse_string(const std::string& state_key);
+ // Original data's unit is B or KB.
+ static int64_t parse_bytes(const std::string& state_key);
+
+ // Flush cached process status info from `/proc/self/status`.
+ static void refresh_proc_status();
+ static void get_proc_status(ProcStatus* out);
+ // Return the process actual physical memory in bytes.
+ static inline int64_t get_vm_rss() { return parse_bytes("status/VmRSS"); }
+
private:
// Copy constructor and assignment not allowed
PerfCounters(const PerfCounters&);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org