You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/22 06:46:26 UTC
[incubator-doris] branch master updated: [feat](memory-track) Print peak memory use of all backend after query in audit log (#7030)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 836c95c [feat](memory-track) Print peak memory use of all backend after query in audit log (#7030)
836c95c is described below
commit 836c95c2caf9f8a5508ebb63247c7ec22e2e9dab
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Mon Nov 22 14:46:08 2021 +0800
[feat](memory-track) Print peak memory use of all backend after query in audit log (#7030)
Add a new field `peakMemoryBytes` in fe.audit.log
---
be/src/runtime/buffer_control_block.h | 7 +++
be/src/runtime/mem_tracker.h | 2 +-
be/src/runtime/plan_fragment_executor.cpp | 12 ++++
be/src/runtime/plan_fragment_executor.h | 2 +
be/src/runtime/query_statistics.cpp | 70 ++++++++++++++++++++++
be/src/runtime/query_statistics.h | 68 +++++++++++++++------
be/src/runtime/result_sink.cpp | 1 +
be/src/runtime/runtime_state.cpp | 2 +-
be/src/runtime/runtime_state.h | 6 ++
.../java/org/apache/doris/plugin/AuditEvent.java | 8 +++
.../java/org/apache/doris/qe/ConnectProcessor.java | 1 +
.../main/java/org/apache/doris/qe/Coordinator.java | 1 +
.../java/org/apache/doris/qe/StmtExecutor.java | 1 +
gensrc/proto/data.proto | 7 +++
gensrc/thrift/PaloInternalService.thrift | 1 +
15 files changed, 169 insertions(+), 20 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h
index aad8928..95831e5 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -97,6 +97,13 @@ public:
}
}
+ void update_max_peak_memory_bytes() {
+ if (_query_statistics.get() != nullptr) {
+ int64_t max_peak_memory_bytes = _query_statistics->calculate_max_peak_memory_bytes();
+ _query_statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+ }
+ }
+
private:
typedef std::list<std::unique_ptr<TFetchDataResult>> ResultQueue;
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 172ff20..260021a 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -411,7 +411,7 @@ public:
return msg.str();
}
- bool is_consumption_metric_null() { return consumption_metric_ == nullptr; }
+ bool is_consumption_metric_null() const { return consumption_metric_ == nullptr; }
static const std::string COUNTER_NAME;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index c2b3d52..a2ac3a3 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -83,6 +83,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
_runtime_state->set_be_number(request.backend_num);
+ if (request.__isset.backend_id) {
+ _runtime_state->set_backend_id(request.backend_id);
+ }
if (request.__isset.import_label) {
_runtime_state->set_import_label(request.import_label);
}
@@ -340,6 +343,15 @@ void PlanFragmentExecutor::_collect_query_statistics() {
_query_statistics->clear();
_plan->collect_query_statistics(_query_statistics.get());
_query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS);
+ if (_runtime_state->backend_id() != -1) {
+ _collect_node_statistics();
+ }
+}
+
+void PlanFragmentExecutor::_collect_node_statistics() {
+ DCHECK(_runtime_state->backend_id() != -1);
+ NodeStatistics* node_statistics = _query_statistics->add_nodes_statistics(_runtime_state->backend_id());
+ node_statistics->add_peak_memory(_mem_tracker->peak_consumption());
}
void PlanFragmentExecutor::report_profile() {
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index b4c9725..6280ab8 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -247,6 +247,8 @@ private:
const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); }
void _collect_query_statistics();
+
+ void _collect_node_statistics();
};
} // namespace doris
diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp
index 9d58594..a3961b3 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -19,10 +19,80 @@
namespace doris {
+void NodeStatistics::merge(const NodeStatistics& other) {
+ peak_memory_bytes += other.peak_memory_bytes;
+}
+
+void NodeStatistics::to_pb(PNodeStatistics* node_statistics) {
+ DCHECK(node_statistics != nullptr);
+ node_statistics->set_peak_memory_bytes(peak_memory_bytes);
+}
+
+void NodeStatistics::from_pb(const PNodeStatistics& node_statistics) {
+ peak_memory_bytes = node_statistics.peak_memory_bytes();
+}
+
+void QueryStatistics::merge(const QueryStatistics& other) {
+ scan_rows += other.scan_rows;
+ scan_bytes += other.scan_bytes;
+ cpu_ms += other.cpu_ms;
+ for (auto& other_node_statistics : other._nodes_statistics_map) {
+ int64_t node_id = other_node_statistics.first;
+ auto node_statistics = add_nodes_statistics(node_id);
+ node_statistics->merge(*other_node_statistics.second);
+ }
+}
+
+void QueryStatistics::to_pb(PQueryStatistics* statistics) {
+ DCHECK(statistics != nullptr);
+ statistics->set_scan_rows(scan_rows);
+ statistics->set_scan_bytes(scan_bytes);
+ statistics->set_cpu_ms(cpu_ms);
+ statistics->set_returned_rows(returned_rows);
+ statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+ for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) {
+ auto node_statistics = statistics->add_nodes_statistics();
+ node_statistics->set_node_id(iter->first);
+ iter->second->to_pb(node_statistics);
+ }
+}
+
+void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
+ scan_rows = statistics.scan_rows();
+ scan_bytes = statistics.scan_bytes();
+ cpu_ms = statistics.cpu_ms();
+ for (auto& p_node_statistics : statistics.nodes_statistics()) {
+ int64_t node_id = p_node_statistics.node_id();
+ auto node_statistics = add_nodes_statistics(node_id);
+ node_statistics->from_pb(p_node_statistics);
+ }
+}
+
+int64_t QueryStatistics::calculate_max_peak_memory_bytes() {
+ int64_t max_peak_memory_bytes = 0;
+ for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) {
+ if (max_peak_memory_bytes < iter->second->peak_memory_bytes) {
+ max_peak_memory_bytes = iter->second->peak_memory_bytes;
+ }
+ }
+ return max_peak_memory_bytes;
+}
+
void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
recvr->merge(this);
}
+void QueryStatistics::clearNodeStatistics() {
+ for (auto& pair : _nodes_statistics_map) {
+ delete pair.second;
+ }
+ _nodes_statistics_map.clear();
+}
+
+QueryStatistics::~QueryStatistics() {
+ clearNodeStatistics();
+}
+
void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) {
std::lock_guard<SpinLock> l(_lock);
QueryStatistics* query_statistics = nullptr;
diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h
index ab25007..40feedc 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -25,20 +25,35 @@
namespace doris {
+class QueryStatistics;
class QueryStatisticsRecvr;
+class NodeStatistics {
+public:
+ NodeStatistics() : peak_memory_bytes(0) {};
+
+ void add_peak_memory(int64_t peak_memory) { this->peak_memory_bytes += peak_memory; }
+
+ void merge(const NodeStatistics& other);
+
+ void to_pb(PNodeStatistics* node_statistics);
+
+ void from_pb(const PNodeStatistics& node_statistics);
+
+private:
+ friend class QueryStatistics;
+ int64_t peak_memory_bytes;
+};
+
// This is responsible for collecting query statistics, usually it consists of
// two parts, one is current fragment or plan's statistics, the other is sub fragment
// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
class QueryStatistics {
public:
- QueryStatistics() : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0) {}
+ QueryStatistics() : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0), max_peak_memory_bytes(0) {}
+ ~QueryStatistics();
- void merge(const QueryStatistics& other) {
- scan_rows += other.scan_rows;
- scan_bytes += other.scan_bytes;
- cpu_ms += other.cpu_ms;
- }
+ void merge(const QueryStatistics& other);
void add_scan_rows(int64_t scan_rows) { this->scan_rows += scan_rows; }
@@ -46,30 +61,41 @@ public:
void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; }
+ NodeStatistics* add_nodes_statistics(int64_t node_id) {
+ NodeStatistics* nodeStatistics = nullptr;
+ auto iter = _nodes_statistics_map.find(node_id);
+ if (iter == _nodes_statistics_map.end()) {
+ nodeStatistics = new NodeStatistics;
+ _nodes_statistics_map[node_id] = nodeStatistics;
+ } else {
+ nodeStatistics = iter->second;
+ }
+ return nodeStatistics;
+ }
+
void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }
+ void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) { this->max_peak_memory_bytes = max_peak_memory_bytes; }
+
void merge(QueryStatisticsRecvr* recvr);
+ // Get the maximum value from the peak memory collected by all node statistics
+ int64_t calculate_max_peak_memory_bytes();
+
+ void clearNodeStatistics();
+
void clear() {
scan_rows = 0;
scan_bytes = 0;
cpu_ms = 0;
returned_rows = 0;
+ max_peak_memory_bytes = 0;
+ clearNodeStatistics();
}
- void to_pb(PQueryStatistics* statistics) {
- DCHECK(statistics != nullptr);
- statistics->set_scan_rows(scan_rows);
- statistics->set_scan_bytes(scan_bytes);
- statistics->set_cpu_ms(cpu_ms);
- statistics->set_returned_rows(returned_rows);
- }
+ void to_pb(PQueryStatistics* statistics);
- void from_pb(const PQueryStatistics& statistics) {
- scan_rows = statistics.scan_rows();
- scan_bytes = statistics.scan_bytes();
- cpu_ms = statistics.cpu_ms();
- }
+ void from_pb(const PQueryStatistics& statistics);
private:
int64_t scan_rows;
@@ -78,6 +104,12 @@ private:
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
+ // Maximum memory peak for all backends.
+ // only set once by result sink when closing.
+ int64_t max_peak_memory_bytes;
+ // The statistics of the query on each backend.
+ typedef std::unordered_map<int64_t, NodeStatistics*> NodeStatisticsMap;
+ NodeStatisticsMap _nodes_statistics_map;
};
// It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index 49870e6..1392037 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -118,6 +118,7 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) {
// close sender, this is normal path end
if (_sender) {
_sender->update_num_written_rows(_writer->get_written_rows());
+ _sender->update_max_peak_memory_bytes();
_sender->close(final_status);
}
state->exec_env()->result_mgr()->cancel_at_time(
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4b6d1e8..8ac3378 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -201,7 +201,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1;
// we do not use global query-map for now, to avoid mem-exceeded different fragments
// running on the same machine.
- // TODO(lingbin): open it later. note that open with BufferedBlcokMgr's BlockMgrsMap
+ // TODO(lingbin): open it later. note that open with BufferedBlockMgr's BlockMgrsMap
// at the same time.
// _query_mem_tracker = MemTracker::get_query_mem_tracker(
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 30179eb..5551f31 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -200,6 +200,9 @@ public:
int codegen_level() const { return _query_options.codegen_level; }
void set_is_cancelled(bool v) { _is_cancelled = v; }
+ void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
+ int64_t backend_id() const { return _backend_id; }
+
void set_be_number(int be_number) { _be_number = be_number; }
int be_number(void) { return _be_number; }
@@ -451,6 +454,9 @@ private:
int _per_fragment_instance_idx;
int _num_per_fragment_instances = 0;
+ // The backend id on which this fragment instance runs
+ int64_t _backend_id = -1;
+
// used as send id
int _be_number;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 0054e7e..500d6f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -17,6 +17,7 @@
package org.apache.doris.plugin;
+
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@@ -81,6 +82,8 @@ public class AuditEvent {
public long cpuTimeMs = -1;
@AuditField(value = "SqlHash")
public String sqlHash = "";
+ @AuditField(value = "peakMemoryBytes")
+ public long peakMemoryBytes = -1;
public static class AuditEventBuilder {
@@ -138,6 +141,11 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setPeakMemoryBytes(long peakMemoryBytes) {
+ auditEvent.peakMemoryBytes = peakMemoryBytes;
+ return this;
+ }
+
public AuditEventBuilder setScanRows(long scanRows) {
auditEvent.scanRows = scanRows;
return this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index eaac1e4..01df0c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -119,6 +119,7 @@ public class ConnectProcessor {
.setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
.setScanRows(statistics == null ? 0 : statistics.getScanRows())
.setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
+ .setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes())
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
.setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 6dca441..ad0a9d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -542,6 +542,7 @@ public class Coordinator {
// Each tParam will set the total number of Fragments that need to be executed on the same BE,
// and the BE will determine whether all Fragments have been executed based on this information.
tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
+ tParam.setBackendId(execState.backend.getId());
backendExecStates.add(execState);
if (needCheckBackendState) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f99205d..5fc6993 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1392,6 +1392,7 @@ public class StmtExecutor implements ProfileWriter {
eofPacket.writeTo(serializer);
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
+
public void sendResult(ResultSet resultSet) throws IOException {
context.updateReturnRows(resultSet.getResultRows().size());
// Send meta data.
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index ca62757..6e4ae7d 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -20,11 +20,18 @@ syntax="proto2";
package doris;
option java_package = "org.apache.doris.proto";
+message PNodeStatistics {
+ required int64 node_id = 1;
+ optional int64 peak_memory_bytes = 2;
+}
+
message PQueryStatistics {
optional int64 scan_rows = 1;
optional int64 scan_bytes = 2;
optional int64 returned_rows = 3;
optional int64 cpu_ms = 4;
+ optional int64 max_peak_memory_bytes = 5;
+ repeated PNodeStatistics nodes_statistics = 6;
}
message PRowBatch {
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index e3384b4..4ded4b1 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -322,6 +322,7 @@ struct TExecPlanFragmentParams {
// If this field is unset or it set to false, all @Common components is set.
16: optional bool is_simplified_param
17: optional TTxnParams txn_conf
+ 18: optional i64 backend_id
}
struct TExecPlanFragmentResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org