You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/22 06:46:26 UTC

[incubator-doris] branch master updated: [feat](memory-track) Print peak memory use of all backend after query in audit log (#7030)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 836c95c  [feat](memory-track) Print peak memory use of all backend after query in audit log (#7030)
836c95c is described below

commit 836c95c2caf9f8a5508ebb63247c7ec22e2e9dab
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Mon Nov 22 14:46:08 2021 +0800

    [feat](memory-track) Print peak memory use of all backend after query in audit log (#7030)
    
    Add a new field `peakMemoryBytes` in fe.audit.log
---
 be/src/runtime/buffer_control_block.h              |  7 +++
 be/src/runtime/mem_tracker.h                       |  2 +-
 be/src/runtime/plan_fragment_executor.cpp          | 12 ++++
 be/src/runtime/plan_fragment_executor.h            |  2 +
 be/src/runtime/query_statistics.cpp                | 70 ++++++++++++++++++++++
 be/src/runtime/query_statistics.h                  | 68 +++++++++++++++------
 be/src/runtime/result_sink.cpp                     |  1 +
 be/src/runtime/runtime_state.cpp                   |  2 +-
 be/src/runtime/runtime_state.h                     |  6 ++
 .../java/org/apache/doris/plugin/AuditEvent.java   |  8 +++
 .../java/org/apache/doris/qe/ConnectProcessor.java |  1 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  1 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  1 +
 gensrc/proto/data.proto                            |  7 +++
 gensrc/thrift/PaloInternalService.thrift           |  1 +
 15 files changed, 169 insertions(+), 20 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h
index aad8928..95831e5 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -97,6 +97,13 @@ public:
         }
     }
 
+    void update_max_peak_memory_bytes() {
+        if (_query_statistics.get() != nullptr) {
+            int64_t max_peak_memory_bytes = _query_statistics->calculate_max_peak_memory_bytes();
+            _query_statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+        }
+    }
+
 private:
     typedef std::list<std::unique_ptr<TFetchDataResult>> ResultQueue;
 
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 172ff20..260021a 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -411,7 +411,7 @@ public:
         return msg.str();
     }
 
-    bool is_consumption_metric_null() { return consumption_metric_ == nullptr; }
+    bool is_consumption_metric_null() const { return consumption_metric_ == nullptr; }
 
     static const std::string COUNTER_NAME;
 
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index c2b3d52..a2ac3a3 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -83,6 +83,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
 
     RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
     _runtime_state->set_be_number(request.backend_num);
+    if (request.__isset.backend_id) {
+        _runtime_state->set_backend_id(request.backend_id);
+    }
     if (request.__isset.import_label) {
         _runtime_state->set_import_label(request.import_label);
     }
@@ -340,6 +343,15 @@ void PlanFragmentExecutor::_collect_query_statistics() {
     _query_statistics->clear();
     _plan->collect_query_statistics(_query_statistics.get());
     _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS);
+    if (_runtime_state->backend_id() != -1) {
+        _collect_node_statistics();
+    }
+}
+
+void PlanFragmentExecutor::_collect_node_statistics() {
+    DCHECK(_runtime_state->backend_id() != -1);
+    NodeStatistics* node_statistics = _query_statistics->add_nodes_statistics(_runtime_state->backend_id());
+    node_statistics->add_peak_memory(_mem_tracker->peak_consumption());
 }
 
 void PlanFragmentExecutor::report_profile() {
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index b4c9725..6280ab8 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -247,6 +247,8 @@ private:
     const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); }
 
     void _collect_query_statistics();
+
+    void _collect_node_statistics();
 };
 
 } // namespace doris
diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp
index 9d58594..a3961b3 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -19,10 +19,80 @@
 
 namespace doris {
 
+void NodeStatistics::merge(const NodeStatistics& other) {
+    peak_memory_bytes += other.peak_memory_bytes;
+}
+
+void NodeStatistics::to_pb(PNodeStatistics* node_statistics) {
+    DCHECK(node_statistics != nullptr);
+    node_statistics->set_peak_memory_bytes(peak_memory_bytes);
+}
+
+void NodeStatistics::from_pb(const PNodeStatistics& node_statistics) {
+    peak_memory_bytes = node_statistics.peak_memory_bytes();
+}
+
+void QueryStatistics::merge(const QueryStatistics& other) {
+    scan_rows += other.scan_rows;
+    scan_bytes += other.scan_bytes;
+    cpu_ms += other.cpu_ms;
+    for (auto& other_node_statistics : other._nodes_statistics_map) {
+        int64_t node_id = other_node_statistics.first;
+        auto node_statistics = add_nodes_statistics(node_id);
+        node_statistics->merge(*other_node_statistics.second);
+    }
+}
+
+void QueryStatistics::to_pb(PQueryStatistics* statistics) {
+    DCHECK(statistics != nullptr);
+    statistics->set_scan_rows(scan_rows);
+    statistics->set_scan_bytes(scan_bytes);
+    statistics->set_cpu_ms(cpu_ms);
+    statistics->set_returned_rows(returned_rows);
+    statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+    for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) {
+        auto node_statistics = statistics->add_nodes_statistics();
+        node_statistics->set_node_id(iter->first);
+        iter->second->to_pb(node_statistics);
+    }
+}
+
+void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
+    scan_rows = statistics.scan_rows();
+    scan_bytes = statistics.scan_bytes();
+    cpu_ms = statistics.cpu_ms();
+    for (auto& p_node_statistics : statistics.nodes_statistics()) {
+        int64_t node_id = p_node_statistics.node_id();
+        auto node_statistics = add_nodes_statistics(node_id);
+        node_statistics->from_pb(p_node_statistics);
+    }
+}
+
+int64_t QueryStatistics::calculate_max_peak_memory_bytes() {
+    int64_t max_peak_memory_bytes = 0;
+    for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) {
+        if (max_peak_memory_bytes < iter->second->peak_memory_bytes) {
+            max_peak_memory_bytes = iter->second->peak_memory_bytes;
+        }
+    }
+    return max_peak_memory_bytes;
+}
+
 void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
     recvr->merge(this);
 }
 
+void QueryStatistics::clearNodeStatistics() {
+    for (auto& pair : _nodes_statistics_map) {
+        delete pair.second;
+    }
+    _nodes_statistics_map.clear();
+}
+
+QueryStatistics::~QueryStatistics() {
+    clearNodeStatistics();
+}
+
 void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) {
     std::lock_guard<SpinLock> l(_lock);
     QueryStatistics* query_statistics = nullptr;
diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h
index ab25007..40feedc 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -25,20 +25,35 @@
 
 namespace doris {
 
+class QueryStatistics;
 class QueryStatisticsRecvr;
 
+class NodeStatistics {
+public:
+    NodeStatistics() : peak_memory_bytes(0) {};
+
+    void add_peak_memory(int64_t peak_memory) { this->peak_memory_bytes += peak_memory; }
+
+    void merge(const NodeStatistics& other);
+
+    void to_pb(PNodeStatistics* node_statistics);
+
+    void from_pb(const PNodeStatistics& node_statistics);
+
+private:
+    friend class QueryStatistics;
+    int64_t peak_memory_bytes;
+};
+
 // This is responsible for collecting query statistics, usually it consists of
 // two parts, one is current fragment or plan's statistics, the other is sub fragment
 // or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
 class QueryStatistics {
 public:
-    QueryStatistics() : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0) {}
+    QueryStatistics() : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0), max_peak_memory_bytes(0) {}
+    ~QueryStatistics();
 
-    void merge(const QueryStatistics& other) {
-        scan_rows += other.scan_rows;
-        scan_bytes += other.scan_bytes;
-        cpu_ms += other.cpu_ms;
-    }
+    void merge(const QueryStatistics& other);
 
     void add_scan_rows(int64_t scan_rows) { this->scan_rows += scan_rows; }
 
@@ -46,30 +61,41 @@ public:
 
     void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; }
 
+    NodeStatistics* add_nodes_statistics(int64_t node_id) {
+        NodeStatistics* nodeStatistics = nullptr;
+        auto iter = _nodes_statistics_map.find(node_id);
+        if (iter == _nodes_statistics_map.end()) {
+            nodeStatistics = new NodeStatistics;
+            _nodes_statistics_map[node_id] = nodeStatistics;
+        } else {
+            nodeStatistics = iter->second;
+        }
+        return nodeStatistics;
+    }
+
     void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }
 
+    void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) { this->max_peak_memory_bytes = max_peak_memory_bytes; }
+
     void merge(QueryStatisticsRecvr* recvr);
 
+    // Get the maximum value from the peak memory collected by all node statistics
+    int64_t calculate_max_peak_memory_bytes();
+
+    void clearNodeStatistics();
+
     void clear() {
         scan_rows = 0;
         scan_bytes = 0;
         cpu_ms = 0;
         returned_rows = 0;
+        max_peak_memory_bytes = 0;
+        clearNodeStatistics();
     }
 
-    void to_pb(PQueryStatistics* statistics) {
-        DCHECK(statistics != nullptr);
-        statistics->set_scan_rows(scan_rows);
-        statistics->set_scan_bytes(scan_bytes);
-        statistics->set_cpu_ms(cpu_ms);
-        statistics->set_returned_rows(returned_rows);
-    }
+    void to_pb(PQueryStatistics* statistics);
 
-    void from_pb(const PQueryStatistics& statistics) {
-        scan_rows = statistics.scan_rows();
-        scan_bytes = statistics.scan_bytes();
-        cpu_ms = statistics.cpu_ms();
-    }
+    void from_pb(const PQueryStatistics& statistics);
 
 private:
     int64_t scan_rows;
@@ -78,6 +104,12 @@ private:
     // number rows returned by query.
     // only set once by result sink when closing.
     int64_t returned_rows;
+    // Maximum memory peak for all backends.
+    // only set once by result sink when closing.
+    int64_t max_peak_memory_bytes;
+    // The statistics of the query on each backend.
+    typedef std::unordered_map<int64_t, NodeStatistics*> NodeStatisticsMap;
+    NodeStatisticsMap _nodes_statistics_map;
 };
 
 // It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index 49870e6..1392037 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -118,6 +118,7 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) {
     // close sender, this is normal path end
     if (_sender) {
         _sender->update_num_written_rows(_writer->get_written_rows());
+        _sender->update_max_peak_memory_bytes();
         _sender->close(final_status);
     }
     state->exec_env()->result_mgr()->cancel_at_time(
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4b6d1e8..8ac3378 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -201,7 +201,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
     int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1;
     // we do not use global query-map  for now, to avoid mem-exceeded different fragments
     // running on the same machine.
-    // TODO(lingbin): open it later. note that open with BufferedBlcokMgr's BlockMgrsMap
+    // TODO(lingbin): open it later. note that open with BufferedBlockMgr's BlockMgrsMap
     // at the same time.
 
     // _query_mem_tracker = MemTracker::get_query_mem_tracker(
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 30179eb..5551f31 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -200,6 +200,9 @@ public:
     int codegen_level() const { return _query_options.codegen_level; }
     void set_is_cancelled(bool v) { _is_cancelled = v; }
 
+    void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
+    int64_t backend_id() const { return _backend_id; }
+
     void set_be_number(int be_number) { _be_number = be_number; }
     int be_number(void) { return _be_number; }
 
@@ -451,6 +454,9 @@ private:
     int _per_fragment_instance_idx;
     int _num_per_fragment_instances = 0;
 
+    // The backend id on which this fragment instance runs
+    int64_t _backend_id = -1;
+
     // used as send id
     int _be_number;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 0054e7e..500d6f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.plugin;
 
+
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 
@@ -81,6 +82,8 @@ public class AuditEvent {
     public long cpuTimeMs = -1;
     @AuditField(value = "SqlHash")
     public String sqlHash = "";
+    @AuditField(value = "peakMemoryBytes")
+    public long peakMemoryBytes = -1;
 
     public static class AuditEventBuilder {
 
@@ -138,6 +141,11 @@ public class AuditEvent {
             return this;
         }
 
+        public AuditEventBuilder setPeakMemoryBytes(long peakMemoryBytes) {
+            auditEvent.peakMemoryBytes = peakMemoryBytes;
+            return this;
+        }
+
         public AuditEventBuilder setScanRows(long scanRows) {
             auditEvent.scanRows = scanRows;
             return this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index eaac1e4..01df0c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -119,6 +119,7 @@ public class ConnectProcessor {
             .setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
             .setScanRows(statistics == null ? 0 : statistics.getScanRows())
             .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
+            .setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes())
             .setReturnRows(ctx.getReturnRows())
             .setStmtId(ctx.getStmtId())
             .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 6dca441..ad0a9d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -542,6 +542,7 @@ public class Coordinator {
                     // Each tParam will set the total number of Fragments that need to be executed on the same BE,
                     // and the BE will determine whether all Fragments have been executed based on this information.
                     tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
+                    tParam.setBackendId(execState.backend.getId());
 
                     backendExecStates.add(execState);
                     if (needCheckBackendState) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f99205d..5fc6993 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1392,6 +1392,7 @@ public class StmtExecutor implements ProfileWriter {
         eofPacket.writeTo(serializer);
         context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
     }
+
     public void sendResult(ResultSet resultSet) throws IOException {
         context.updateReturnRows(resultSet.getResultRows().size());
         // Send meta data.
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index ca62757..6e4ae7d 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -20,11 +20,18 @@ syntax="proto2";
 package doris;
 option java_package = "org.apache.doris.proto";
 
+message PNodeStatistics {
+    required int64 node_id = 1;
+    optional int64 peak_memory_bytes = 2;
+}
+
 message PQueryStatistics {
     optional int64 scan_rows = 1;
     optional int64 scan_bytes = 2;
     optional int64 returned_rows = 3;
     optional int64 cpu_ms = 4;
+    optional int64 max_peak_memory_bytes = 5;
+    repeated PNodeStatistics nodes_statistics = 6;
 }
 
 message PRowBatch {
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index e3384b4..4ded4b1 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -322,6 +322,7 @@ struct TExecPlanFragmentParams {
   // If this field is unset or it set to false, all @Common components is set.
   16: optional bool is_simplified_param
   17: optional TTxnParams txn_conf
+  18: optional i64 backend_id
 }
 
 struct TExecPlanFragmentResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org