You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/13 14:22:26 UTC
[incubator-doris] branch master updated: [Profile] Add cpu time
cost in query audit (#5051)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ff4bd12 [Profile] Add cpu time cost in query audit (#5051)
ff4bd12 is described below
commit ff4bd1223fc95a1e8f5281345388fc69942d7712
Author: Lijia Liu <li...@yeah.net>
AuthorDate: Sun Dec 13 22:22:15 2020 +0800
[Profile] Add cpu time cost in query audit (#5051)
---
be/src/exec/olap_scan_node.cpp | 3 +++
be/src/exec/olap_scan_node.h | 1 +
be/src/runtime/plan_fragment_executor.cpp | 21 +++++++++++++++------
be/src/runtime/plan_fragment_executor.h | 5 ++++-
be/src/runtime/query_statistics.h | 9 ++++++++-
be/src/util/runtime_profile.h | 1 +
be/src/util/stopwatch.hpp | 22 +++++++++++++++++-----
be/src/util/time.h | 1 +
.../java/org/apache/doris/plugin/AuditEvent.java | 7 +++++++
.../java/org/apache/doris/qe/ConnectProcessor.java | 1 +
.../java/org/apache/doris/qe/StmtExecutor.java | 3 +++
.../org/apache/doris/qe/ConnectProcessorTest.java | 1 +
gensrc/proto/data.proto | 1 +
13 files changed, 63 insertions(+), 13 deletions(-)
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 1b8b5a8..ce79fb5 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -124,6 +124,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) {
_index_load_timer = ADD_TIMER(_segment_profile, "IndexLoadTime_V1");
_scan_timer = ADD_TIMER(_scanner_profile, "ScanTime");
+ _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScanCpuTime");
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
_cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);
@@ -290,6 +291,7 @@ Status OlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->add_scan_bytes(_read_compressed_counter->value());
statistics->add_scan_rows(_raw_rows_counter->value());
+ statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
return Status::OK();
}
@@ -1308,6 +1310,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
}
void OlapScanNode::scanner_thread(OlapScanner* scanner) {
+ SCOPED_CPU_TIMER(_scan_cpu_timer);
Status status = Status::OK();
bool eos = false;
RuntimeState* state = scanner->runtime_state();
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index d0fa54e..0f5cbb6 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -244,6 +244,7 @@ private:
Status _status;
RuntimeState* _runtime_state;
RuntimeProfile::Counter* _scan_timer;
+ RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
RuntimeProfile::Counter* _tablet_counter;
RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr;
RuntimeProfile::Counter* _reader_init_timer = nullptr;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 20a4dd7..e29ee96 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -218,6 +218,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
// set up profile counters
profile()->add_child(_plan->runtime_profile(), true, NULL);
_rows_produced_counter = ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
+ _fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime");
_row_batch.reset(new RowBatch(_plan->row_desc(), _runtime_state->batch_size(),
_runtime_state->instance_mem_tracker().get()));
@@ -264,6 +265,7 @@ Status PlanFragmentExecutor::open() {
Status PlanFragmentExecutor::open_internal() {
{
+ SCOPED_CPU_TIMER(_fragment_cpu_timer);
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(_plan->open(_runtime_state.get()));
}
@@ -271,14 +273,19 @@ Status PlanFragmentExecutor::open_internal() {
if (_sink.get() == NULL) {
return Status::OK();
}
- RETURN_IF_ERROR(_sink->open(runtime_state()));
+ {
+ SCOPED_CPU_TIMER(_fragment_cpu_timer);
+ RETURN_IF_ERROR(_sink->open(runtime_state()));
+ }
// If there is a sink, do all the work of driving it here, so that
// when this returns the query has actually finished
RowBatch* batch = NULL;
-
while (true) {
- RETURN_IF_ERROR(get_next_internal(&batch));
+ {
+ SCOPED_CPU_TIMER(_fragment_cpu_timer);
+ RETURN_IF_ERROR(get_next_internal(&batch));
+ }
if (batch == NULL) {
break;
@@ -295,9 +302,10 @@ Status PlanFragmentExecutor::open_internal() {
}
SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_CPU_TIMER(_fragment_cpu_timer);
// Collect this plan and sub plan statistics, and send to parent plan.
if (_collect_query_statistics_with_every_batch) {
- collect_query_statistics();
+ _collect_query_statistics();
}
RETURN_IF_ERROR(_sink->send(runtime_state(), batch));
}
@@ -315,7 +323,7 @@ Status PlanFragmentExecutor::open_internal() {
// audit the sinks to check that this is ok, or change that behaviour.
{
SCOPED_TIMER(profile()->total_time_counter());
- collect_query_statistics();
+ _collect_query_statistics();
Status status;
{
boost::lock_guard<boost::mutex> l(_status_lock);
@@ -337,9 +345,10 @@ Status PlanFragmentExecutor::open_internal() {
return Status::OK();
}
-void PlanFragmentExecutor::collect_query_statistics() {
+void PlanFragmentExecutor::_collect_query_statistics() {
_query_statistics->clear();
_plan->collect_query_statistics(_query_statistics.get());
+ _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS);
}
void PlanFragmentExecutor::report_profile() {
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index e2406fe..37f544a 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -28,6 +28,7 @@
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "util/hash_util.hpp"
+#include "util/time.h"
namespace doris {
@@ -205,6 +206,8 @@ private:
// Number of rows returned by this fragment
RuntimeProfile::Counter* _rows_produced_counter;
+ RuntimeProfile::Counter* _fragment_cpu_timer;
+
// Average number of thread tokens for the duration of the plan fragment execution.
// Fragments that do a lot of cpu work (non-coordinator fragment) will have at
// least 1 token. Fragments that contain a hdfs scan node will have 1+ tokens
@@ -257,7 +260,7 @@ private:
const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); }
- void collect_query_statistics();
+ void _collect_query_statistics();
};
// Save the common components of fragments in a query.
diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h
index 09ef828..92234e8 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -32,17 +32,20 @@ class QueryStatisticsRecvr;
// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
class QueryStatistics {
public:
- QueryStatistics() : scan_rows(0), scan_bytes(0), returned_rows(0) {}
+ QueryStatistics() : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0) {}
void merge(const QueryStatistics& other) {
scan_rows += other.scan_rows;
scan_bytes += other.scan_bytes;
+ cpu_ms += other.cpu_ms;
}
void add_scan_rows(int64_t scan_rows) { this->scan_rows += scan_rows; }
void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; }
+ void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; }
+
void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }
void merge(QueryStatisticsRecvr* recvr);
@@ -50,6 +53,7 @@ public:
void clear() {
scan_rows = 0;
scan_bytes = 0;
+ cpu_ms = 0;
returned_rows = 0;
}
@@ -57,17 +61,20 @@ public:
DCHECK(statistics != nullptr);
statistics->set_scan_rows(scan_rows);
statistics->set_scan_bytes(scan_bytes);
+ statistics->set_cpu_ms(cpu_ms);
statistics->set_returned_rows(returned_rows);
}
void merge_pb(const PQueryStatistics& statistics) {
scan_rows += statistics.scan_rows();
scan_bytes += statistics.scan_bytes();
+ cpu_ms += statistics.cpu_ms();
}
private:
int64_t scan_rows;
int64_t scan_bytes;
+ int64_t cpu_ms;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 46659c4..2bd2da7 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -48,6 +48,7 @@ namespace doris {
#define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS)
#define ADD_CHILD_TIMER(profile, name, parent) (profile)->add_counter(name, TUnit::TIME_NS, parent)
#define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
+#define SCOPED_CPU_TIMER(c) ScopedTimer<ThreadCpuStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
#define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
#define SCOPED_RAW_TIMER(c) \
diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp
index 1d52d86..53905a5 100644
--- a/be/src/util/stopwatch.hpp
+++ b/be/src/util/stopwatch.hpp
@@ -29,16 +29,17 @@ namespace doris {
// it is not affected by user setting the system clock.
// CLOCK_MONOTONIC represents monotonic time since some unspecified starting point.
// It is good for computing elapsed time.
-class MonotonicStopWatch {
+template <clockid_t Clock>
+class CustomStopWatch {
public:
- MonotonicStopWatch() {
+ CustomStopWatch() {
_total_time = 0;
_running = false;
}
void start() {
if (!_running) {
- clock_gettime(CLOCK_MONOTONIC, &_start);
+ clock_gettime(Clock, &_start);
_running = true;
}
}
@@ -55,7 +56,7 @@ public:
uint64_t ret = elapsed_time();
if (_running) {
- clock_gettime(CLOCK_MONOTONIC, &_start);
+ clock_gettime(Clock, &_start);
}
return ret;
@@ -68,7 +69,7 @@ public:
}
timespec end;
- clock_gettime(CLOCK_MONOTONIC, &end);
+ clock_gettime(Clock, &end);
return (end.tv_sec - _start.tv_sec) * 1000L * 1000L * 1000L +
(end.tv_nsec - _start.tv_nsec);
}
@@ -79,6 +80,17 @@ private:
bool _running;
};
+// Stop watch for reporting elapsed time in nanosec based on CLOCK_MONOTONIC.
+// It is as fast as Rdtsc.
+// It is also accurate because it not affected by cpu frequency changes and
+// it is not affected by user setting the system clock.
+// CLOCK_MONOTONIC represents monotonic time since some unspecified starting point.
+// It is good for computing elapsed time.
+using MonotonicStopWatch = CustomStopWatch<CLOCK_MONOTONIC>;
+
+// Stop watch for reporting elapsed nanosec based on CLOCK_THREAD_CPUTIME_ID.
+using ThreadCpuStopWatch = CustomStopWatch<CLOCK_THREAD_CPUTIME_ID>;
+
}
#endif
diff --git a/be/src/util/time.h b/be/src/util/time.h
index 26e54ea..f54dc82 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -24,6 +24,7 @@
#include <string>
#define NANOS_PER_SEC 1000000000ll
+#define NANOS_PER_MILLIS 1000000ll
#define NANOS_PER_MICRO 1000ll
#define MICROS_PER_SEC 1000000ll
#define MICROS_PER_MILLI 1000ll
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index ebaf3da..1e9f9a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -75,6 +75,8 @@ public class AuditEvent {
public String feIp = "";
@AuditField(value = "Stmt")
public String stmt = "";
+ @AuditField(value = "CpuTimeMS")
+ public long cpuTimeMs = -1;
public static class AuditEventBuilder {
@@ -127,6 +129,11 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setCpuTimeMs(long cpuTimeMs) {
+ auditEvent.cpuTimeMs = cpuTimeMs;
+ return this;
+ }
+
public AuditEventBuilder setScanRows(long scanRows) {
auditEvent.scanRows = scanRows;
return this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 993de8e..e32cbee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -115,6 +115,7 @@ public class ConnectProcessor {
.setState(ctx.getState().toString()).setQueryTime(elapseMs)
.setScanBytes(statistics == null ? 0 : statistics.scan_bytes)
.setScanRows(statistics == null ? 0 : statistics.scan_rows)
+ .setCpuTimeMs(statistics == null ? 0 : statistics.cpu_ms)
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
.setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c894048..b176dc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1125,6 +1125,9 @@ public class StmtExecutor {
if (statisticsForAuditLog.scan_rows == null) {
statisticsForAuditLog.scan_rows = 0L;
}
+ if (statisticsForAuditLog.cpu_ms == null) {
+ statisticsForAuditLog.cpu_ms = 0L;
+ }
return statisticsForAuditLog;
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
index 11bf6f2..7421608 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
@@ -100,6 +100,7 @@ public class ConnectProcessorTest {
statistics.scan_bytes = 0L;
statistics.scan_rows = 0L;
+ statistics.cpu_ms = 0L;
MetricRepo.init();
}
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index 09af10c..d380031 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -24,6 +24,7 @@ message PQueryStatistics {
optional int64 scan_rows = 1;
optional int64 scan_bytes = 2;
optional int64 returned_rows = 3;
+ optional int64 cpu_ms = 4;
}
message PRowBatch {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org