You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/13 14:22:26 UTC

[incubator-doris] branch master updated: [Profile] Add cpu time cost in query audit (#5051)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ff4bd12  [Profile] Add cpu time cost in query audit (#5051)
ff4bd12 is described below

commit ff4bd1223fc95a1e8f5281345388fc69942d7712
Author: Lijia Liu <li...@yeah.net>
AuthorDate: Sun Dec 13 22:22:15 2020 +0800

    [Profile] Add cpu time cost in query audit (#5051)
---
 be/src/exec/olap_scan_node.cpp                     |  3 +++
 be/src/exec/olap_scan_node.h                       |  1 +
 be/src/runtime/plan_fragment_executor.cpp          | 21 +++++++++++++++------
 be/src/runtime/plan_fragment_executor.h            |  5 ++++-
 be/src/runtime/query_statistics.h                  |  9 ++++++++-
 be/src/util/runtime_profile.h                      |  1 +
 be/src/util/stopwatch.hpp                          | 22 +++++++++++++++++-----
 be/src/util/time.h                                 |  1 +
 .../java/org/apache/doris/plugin/AuditEvent.java   |  7 +++++++
 .../java/org/apache/doris/qe/ConnectProcessor.java |  1 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  3 +++
 .../org/apache/doris/qe/ConnectProcessorTest.java  |  1 +
 gensrc/proto/data.proto                            |  1 +
 13 files changed, 63 insertions(+), 13 deletions(-)

diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 1b8b5a8..ce79fb5 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -124,6 +124,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) {
     _index_load_timer = ADD_TIMER(_segment_profile, "IndexLoadTime_V1");
 
     _scan_timer = ADD_TIMER(_scanner_profile, "ScanTime");
+    _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScanCpuTime");
 
     _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
     _cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);
@@ -290,6 +291,7 @@ Status OlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
     RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
     statistics->add_scan_bytes(_read_compressed_counter->value());
     statistics->add_scan_rows(_raw_rows_counter->value());
+    statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
     return Status::OK();
 }
 
@@ -1308,6 +1310,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
 }
 
 void OlapScanNode::scanner_thread(OlapScanner* scanner) {
+    SCOPED_CPU_TIMER(_scan_cpu_timer);
     Status status = Status::OK();
     bool eos = false;
     RuntimeState* state = scanner->runtime_state();
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index d0fa54e..0f5cbb6 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -244,6 +244,7 @@ private:
     Status _status;
     RuntimeState* _runtime_state;
     RuntimeProfile::Counter* _scan_timer;
+    RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
     RuntimeProfile::Counter* _tablet_counter;
     RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr;
     RuntimeProfile::Counter* _reader_init_timer = nullptr;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 20a4dd7..e29ee96 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -218,6 +218,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
     // set up profile counters
     profile()->add_child(_plan->runtime_profile(), true, NULL);
     _rows_produced_counter = ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
+    _fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime");
 
     _row_batch.reset(new RowBatch(_plan->row_desc(), _runtime_state->batch_size(),
                                   _runtime_state->instance_mem_tracker().get()));
@@ -264,6 +265,7 @@ Status PlanFragmentExecutor::open() {
 
 Status PlanFragmentExecutor::open_internal() {
     {
+        SCOPED_CPU_TIMER(_fragment_cpu_timer);
         SCOPED_TIMER(profile()->total_time_counter());
         RETURN_IF_ERROR(_plan->open(_runtime_state.get()));
     }
@@ -271,14 +273,19 @@ Status PlanFragmentExecutor::open_internal() {
     if (_sink.get() == NULL) {
         return Status::OK();
     }
-    RETURN_IF_ERROR(_sink->open(runtime_state()));
+    {
+        SCOPED_CPU_TIMER(_fragment_cpu_timer);
+        RETURN_IF_ERROR(_sink->open(runtime_state()));
+    }
 
     // If there is a sink, do all the work of driving it here, so that
     // when this returns the query has actually finished
     RowBatch* batch = NULL;
-
     while (true) {
-        RETURN_IF_ERROR(get_next_internal(&batch));
+        {
+            SCOPED_CPU_TIMER(_fragment_cpu_timer);
+            RETURN_IF_ERROR(get_next_internal(&batch));
+        }
 
         if (batch == NULL) {
             break;
@@ -295,9 +302,10 @@ Status PlanFragmentExecutor::open_internal() {
         }
 
         SCOPED_TIMER(profile()->total_time_counter());
+        SCOPED_CPU_TIMER(_fragment_cpu_timer);
         // Collect this plan and sub plan statistics, and send to parent plan.
         if (_collect_query_statistics_with_every_batch) {
-            collect_query_statistics();
+            _collect_query_statistics();
         }
         RETURN_IF_ERROR(_sink->send(runtime_state(), batch));
     }
@@ -315,7 +323,7 @@ Status PlanFragmentExecutor::open_internal() {
     // audit the sinks to check that this is ok, or change that behaviour.
     {
         SCOPED_TIMER(profile()->total_time_counter());
-        collect_query_statistics();
+        _collect_query_statistics();
         Status status;
         {
             boost::lock_guard<boost::mutex> l(_status_lock);
@@ -337,9 +345,10 @@ Status PlanFragmentExecutor::open_internal() {
     return Status::OK();
 }
 
-void PlanFragmentExecutor::collect_query_statistics() {
+void PlanFragmentExecutor::_collect_query_statistics() {
     _query_statistics->clear();
     _plan->collect_query_statistics(_query_statistics.get());
+    _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS);
 }
 
 void PlanFragmentExecutor::report_profile() {
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index e2406fe..37f544a 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -28,6 +28,7 @@
 #include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
 #include "util/hash_util.hpp"
+#include "util/time.h"
 
 namespace doris {
 
@@ -205,6 +206,8 @@ private:
     // Number of rows returned by this fragment
     RuntimeProfile::Counter* _rows_produced_counter;
 
+    RuntimeProfile::Counter* _fragment_cpu_timer;
+
     // Average number of thread tokens for the duration of the plan fragment execution.
     // Fragments that do a lot of cpu work (non-coordinator fragment) will have at
     // least 1 token.  Fragments that contain a hdfs scan node will have 1+ tokens
@@ -257,7 +260,7 @@ private:
 
     const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); }
 
-    void collect_query_statistics();
+    void _collect_query_statistics();
 };
 
 // Save the common components of fragments in a query.
diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h
index 09ef828..92234e8 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -32,17 +32,20 @@ class QueryStatisticsRecvr;
 // or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
 class QueryStatistics {
 public:
-    QueryStatistics() : scan_rows(0), scan_bytes(0), returned_rows(0) {}
+    QueryStatistics() : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0) {}
 
     void merge(const QueryStatistics& other) {
         scan_rows += other.scan_rows;
         scan_bytes += other.scan_bytes;
+        cpu_ms += other.cpu_ms;
     }
 
     void add_scan_rows(int64_t scan_rows) { this->scan_rows += scan_rows; }
 
     void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; }
 
+    void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; }
+
     void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }
 
     void merge(QueryStatisticsRecvr* recvr);
@@ -50,6 +53,7 @@ public:
     void clear() {
         scan_rows = 0;
         scan_bytes = 0;
+        cpu_ms = 0;
         returned_rows = 0;
     }
 
@@ -57,17 +61,20 @@ public:
         DCHECK(statistics != nullptr);
         statistics->set_scan_rows(scan_rows);
         statistics->set_scan_bytes(scan_bytes);
+        statistics->set_cpu_ms(cpu_ms);
         statistics->set_returned_rows(returned_rows);
     }
 
     void merge_pb(const PQueryStatistics& statistics) {
         scan_rows += statistics.scan_rows();
         scan_bytes += statistics.scan_bytes();
+        cpu_ms += statistics.cpu_ms();
     }
 
 private:
     int64_t scan_rows;
     int64_t scan_bytes;
+    int64_t cpu_ms;
     // number rows returned by query.
     // only set once by result sink when closing.
     int64_t returned_rows;
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 46659c4..2bd2da7 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -48,6 +48,7 @@ namespace doris {
 #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS)
 #define ADD_CHILD_TIMER(profile, name, parent) (profile)->add_counter(name, TUnit::TIME_NS, parent)
 #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
+#define SCOPED_CPU_TIMER(c) ScopedTimer<ThreadCpuStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
 #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
     ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
 #define SCOPED_RAW_TIMER(c) \
diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp
index 1d52d86..53905a5 100644
--- a/be/src/util/stopwatch.hpp
+++ b/be/src/util/stopwatch.hpp
@@ -29,16 +29,17 @@ namespace doris {
 // it is not affected by user setting the system clock.
 // CLOCK_MONOTONIC represents monotonic time since some unspecified starting point.
 // It is good for computing elapsed time.
-class MonotonicStopWatch {
+template <clockid_t Clock>
+class CustomStopWatch {
 public:
-    MonotonicStopWatch() {
+    CustomStopWatch() {
         _total_time = 0;
         _running = false;
     }
 
     void start() {
         if (!_running) {
-            clock_gettime(CLOCK_MONOTONIC, &_start);
+            clock_gettime(Clock, &_start);
             _running = true;
         }
     }
@@ -55,7 +56,7 @@ public:
         uint64_t ret = elapsed_time();
 
         if (_running) {
-            clock_gettime(CLOCK_MONOTONIC, &_start);
+            clock_gettime(Clock, &_start);
         }
 
         return ret;
@@ -68,7 +69,7 @@ public:
         }
 
         timespec end;
-        clock_gettime(CLOCK_MONOTONIC, &end);
+        clock_gettime(Clock, &end);
         return (end.tv_sec - _start.tv_sec) * 1000L * 1000L * 1000L +
                (end.tv_nsec - _start.tv_nsec);
     }
@@ -79,6 +80,17 @@ private:
     bool _running;
 };
 
+// Stop watch for reporting elapsed time in nanosec based on CLOCK_MONOTONIC.
+// It is as fast as Rdtsc.
+// It is also accurate because it not affected by cpu frequency changes and
+// it is not affected by user setting the system clock.
+// CLOCK_MONOTONIC represents monotonic time since some unspecified starting point.
+// It is good for computing elapsed time.
+using MonotonicStopWatch = CustomStopWatch<CLOCK_MONOTONIC>;
+
+// Stop watch for reporting elapsed nanosec based on CLOCK_THREAD_CPUTIME_ID.
+using ThreadCpuStopWatch = CustomStopWatch<CLOCK_THREAD_CPUTIME_ID>;
+
 }
 
 #endif
diff --git a/be/src/util/time.h b/be/src/util/time.h
index 26e54ea..f54dc82 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -24,6 +24,7 @@
 #include <string>
 
 #define NANOS_PER_SEC 1000000000ll
+#define NANOS_PER_MILLIS  1000000ll
 #define NANOS_PER_MICRO 1000ll
 #define MICROS_PER_SEC 1000000ll
 #define MICROS_PER_MILLI 1000ll
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index ebaf3da..1e9f9a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -75,6 +75,8 @@ public class AuditEvent {
     public String feIp = "";
     @AuditField(value = "Stmt")
     public String stmt = "";
+    @AuditField(value = "CpuTimeMS")
+    public long cpuTimeMs = -1;
 
     public static class AuditEventBuilder {
 
@@ -127,6 +129,11 @@ public class AuditEvent {
             return this;
         }
 
+        public AuditEventBuilder setCpuTimeMs(long cpuTimeMs) {
+            auditEvent.cpuTimeMs = cpuTimeMs;
+            return this;
+        }
+
         public AuditEventBuilder setScanRows(long scanRows) {
             auditEvent.scanRows = scanRows;
             return this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 993de8e..e32cbee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -115,6 +115,7 @@ public class ConnectProcessor {
             .setState(ctx.getState().toString()).setQueryTime(elapseMs)
             .setScanBytes(statistics == null ? 0 : statistics.scan_bytes)
             .setScanRows(statistics == null ? 0 : statistics.scan_rows)
+            .setCpuTimeMs(statistics == null ? 0 : statistics.cpu_ms)
             .setReturnRows(ctx.getReturnRows())
             .setStmtId(ctx.getStmtId())
             .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c894048..b176dc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1125,6 +1125,9 @@ public class StmtExecutor {
         if (statisticsForAuditLog.scan_rows == null) {
             statisticsForAuditLog.scan_rows = 0L;
         }
+        if (statisticsForAuditLog.cpu_ms == null) {
+            statisticsForAuditLog.cpu_ms = 0L;
+        }
         return statisticsForAuditLog;
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
index 11bf6f2..7421608 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
@@ -100,6 +100,7 @@ public class ConnectProcessorTest {
 
         statistics.scan_bytes = 0L;
         statistics.scan_rows = 0L;
+        statistics.cpu_ms = 0L;
 
         MetricRepo.init();
     }
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index 09af10c..d380031 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -24,6 +24,7 @@ message PQueryStatistics {
     optional int64 scan_rows = 1;
     optional int64 scan_bytes = 2;
     optional int64 returned_rows = 3;
+    optional int64 cpu_ms = 4;
 }
 
 message PRowBatch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org