You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/25 21:46:29 UTC
[11/11] impala git commit: IMPALA-6190/6246: Add instances tab and
event sequence
IMPALA-6190/6246: Add instances tab and event sequence
This change adds tracking of the current state during the execution of a
fragment instance. The current state is then reported back to the
coordinator and exposed to users via a new tab in the query detail debug
webpage.
This change also adds an event timeline to fragment instances in the
query profile. The timeline measures the time since backend-local query
start at which particular events complete. Events are derived from the
current state of the execution of a fragment instance. For example:
- Prepare Finished: 13.436ms (13.436ms)
- First Batch Produced: 1s022ms (1s008ms)
- First Batch Sent: 1s022ms (455.558us)
- ExecInternal Finished: 2s783ms (1s760ms)
I added automated tests for both extensions and additionally verified
the change by manual inspection.
Here are the TPCH performance comparison results between this change and
the previous commit on a 16 node cluster.
+------------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+------------+-----------------------+---------+------------+------------+----------------+
| TPCH(_300) | parquet / none / none | 18.47 | -0.94% | 9.72 | -1.08% |
+------------+-----------------------+---------+------------+------------+----------------+
+------------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
+------------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| TPCH(_300) | TPCH-Q5 | parquet / none / none | 48.88 | 46.93 | +4.15% | 0.14% | 3.61% | 1 | 3 |
| TPCH(_300) | TPCH-Q13 | parquet / none / none | 21.64 | 21.15 | +2.29% | 2.06% | 1.84% | 1 | 3 |
| TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.71 | 1.70 | +1.12% | 0.54% | 2.51% | 1 | 3 |
| TPCH(_300) | TPCH-Q18 | parquet / none / none | 33.15 | 32.79 | +1.09% | 0.13% | 2.03% | 1 | 3 |
| TPCH(_300) | TPCH-Q14 | parquet / none / none | 5.95 | 5.90 | +0.82% | 2.19% | 0.49% | 1 | 3 |
| TPCH(_300) | TPCH-Q1 | parquet / none / none | 13.99 | 13.90 | +0.63% | 0.25% | 1.39% | 1 | 3 |
| TPCH(_300) | TPCH-Q2 | parquet / none / none | 3.44 | 3.44 | +0.00% | * 20.29% * | * 20.76% * | 1 | 3 |
| TPCH(_300) | TPCH-Q6 | parquet / none / none | 1.21 | 1.22 | -0.01% | 0.06% | 0.06% | 1 | 3 |
| TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.51 | 3.51 | -0.11% | 7.15% | 7.30% | 1 | 3 |
| TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.89 | 6.91 | -0.21% | 0.65% | 0.55% | 1 | 3 |
| TPCH(_300) | TPCH-Q4 | parquet / none / none | 4.78 | 4.80 | -0.38% | 0.06% | 0.59% | 1 | 3 |
| TPCH(_300) | TPCH-Q19 | parquet / none / none | 30.78 | 31.04 | -0.83% | 0.45% | 1.03% | 1 | 3 |
| TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.06 | 6.12 | -1.02% | 1.51% | 2.12% | 1 | 3 |
| TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.43 | 9.58 | -1.54% | 0.69% | 3.30% | 1 | 3 |
| TPCH(_300) | TPCH-Q21 | parquet / none / none | 93.41 | 95.18 | -1.86% | 0.08% | 0.81% | 1 | 3 |
| TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.40 | 3.47 | -1.99% | 0.72% | 1.27% | 1 | 3 |
| TPCH(_300) | TPCH-Q7 | parquet / none / none | 44.98 | 46.24 | -2.71% | 1.83% | 1.27% | 1 | 3 |
| TPCH(_300) | TPCH-Q3 | parquet / none / none | 28.06 | 29.11 | -3.61% | 1.62% | 1.23% | 1 | 3 |
| TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.15 | 3.28 | -3.80% | 0.96% | 1.32% | 1 | 3 |
| TPCH(_300) | TPCH-Q9 | parquet / none / none | 29.47 | 30.80 | -4.30% | 0.29% | 0.34% | 1 | 3 |
| TPCH(_300) | TPCH-Q17 | parquet / none / none | 4.37 | 4.62 | -5.33% | 0.63% | 0.54% | 1 | 3 |
| TPCH(_300) | TPCH-Q8 | parquet / none / none | 7.99 | 8.46 | -5.53% | 7.95% | 1.11% | 1 | 3 |
+------------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
Here are the TPCDS performance comparison results between this change
and the previous commit on a 16 node cluster. I inspected the Q2 results
and concluded that the variability is unrelated to this change.
+--------------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+--------------+-----------------------+---------+------------+------------+----------------+
| TPCDS(_1000) | parquet / none / none | 13.07 | +0.51% | 4.27 | +1.83% |
+--------------+-----------------------+---------+------------+------------+----------------+
+--------------+------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
+--------------+------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| TPCDS(_1000) | TPCDS-Q2 | parquet / none / none | 8.36 | 4.25 | R +96.81% | * 48.88% * | 0.42% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q8 | parquet / none / none | 1.59 | 1.35 | +17.86% | * 13.91% * | 4.01% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q73 | parquet / none / none | 1.81 | 1.71 | +5.92% | 5.53% | 0.15% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q28 | parquet / none / none | 7.26 | 6.95 | +4.47% | 1.09% | 1.11% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q46 | parquet / none / none | 2.36 | 2.30 | +2.62% | 1.45% | 0.40% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q7 | parquet / none / none | 2.78 | 2.73 | +1.98% | 1.21% | 2.23% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q55 | parquet / none / none | 1.05 | 1.03 | +1.91% | 1.16% | 2.20% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q42 | parquet / none / none | 1.05 | 1.04 | +1.71% | 0.90% | 2.63% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q19 | parquet / none / none | 1.67 | 1.65 | +1.55% | 1.12% | 1.96% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q23 | parquet / none / none | 151.75 | 149.94 | +1.20% | 3.23% | 1.83% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q64 | parquet / none / none | 40.25 | 39.79 | +1.16% | 0.43% | 0.28% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q96 | parquet / none / none | 2.25 | 2.22 | +1.05% | 1.00% | 0.11% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q53 | parquet / none / none | 1.60 | 1.58 | +1.01% | 1.28% | 0.04% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q79 | parquet / none / none | 4.17 | 4.13 | +0.94% | 0.89% | 0.06% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q59 | parquet / none / none | 5.74 | 5.71 | +0.60% | 1.22% | 2.56% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q52 | parquet / none / none | 0.89 | 0.89 | +0.14% | 0.03% | 0.63% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q88 | parquet / none / none | 7.10 | 7.12 | -0.23% | 0.43% | 0.47% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q3 | parquet / none / none | 1.10 | 1.11 | -0.40% | 0.58% | 0.36% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q98 | parquet / none / none | 2.30 | 2.31 | -0.49% | 3.58% | 1.04% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q61 | parquet / none / none | 1.87 | 1.89 | -1.08% | 1.68% | 0.14% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q27a | parquet / none / none | 2.93 | 2.96 | -1.18% | 1.74% | 1.54% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q34 | parquet / none / none | 2.23 | 2.27 | -1.73% | 1.91% | 1.32% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q63 | parquet / none / none | 1.56 | 1.60 | -1.96% | 1.91% | 3.33% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q89 | parquet / none / none | 2.64 | 2.70 | -2.20% | 1.93% | 1.88% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q47 | parquet / none / none | 30.41 | 31.17 | -2.41% | 1.09% | 1.52% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q1 | parquet / none / none | 3.77 | 3.86 | -2.46% | 1.91% | 0.61% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q6 | parquet / none / none | 61.67 | 63.34 | -2.65% | 3.77% | 0.31% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q4 | parquet / none / none | 31.11 | 31.96 | -2.66% | 0.61% | 0.77% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q43 | parquet / none / none | 4.10 | 4.22 | -2.87% | 1.40% | 2.85% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q5 | parquet / none / none | 8.30 | 8.56 | -3.13% | 1.55% | 0.47% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q27 | parquet / none / none | 2.28 | 2.35 | -3.13% | 1.17% | 1.56% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q65 | parquet / none / none | 31.74 | 32.77 | -3.15% | 1.47% | 1.11% | 1 | 3 |
| TPCDS(_1000) | TPCDS-Q68 | parquet / none / none | 1.56 | 1.62 | -3.58% | 9.37% | * 11.93% * | 1 | 3 |
+--------------+------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
(R) Regression: TPCDS(_1000) TPCDS-Q2 [parquet / none / none] (4.25s -> 8.36s [+96.81%])
+---------------------+------------+----------+----------+------------+------------+----------+----------+------------+--------+---------+-----------+
| Operator | % of Query | Avg | Base Avg | Delta(Avg) | StdDev(%) | Max | Base Max | Delta(Max) | #Hosts | #Rows | Est #Rows |
+---------------------+------------+----------+----------+------------+------------+----------+----------+------------+--------+---------+-----------+
| 27:MERGING-EXCHANGE | 22.48% | 6.97s | 2.85s | +144.40% | * 58.44% * | 11.05s | 2.86s | +286.33% | 1 | 2.51K | 2.56K |
| 26:EXCHANGE | 7.65% | 2.37s | 2.43s | -2.16% | 1.82% | 2.46s | 2.50s | -1.65% | 14 | 365 | 2.56K |
| 23:EXCHANGE | 8.58% | 2.66s | 2.70s | -1.46% | 1.67% | 2.74s | 2.78s | -1.47% | 14 | 516 | 10.64K |
| 13:AGGREGATE | 4.21% | 1.31s | 1.30s | +0.65% | 0.06% | 1.47s | 1.43s | +2.38% | 14 | 516 | 10.64K |
| 12:HASH JOIN | 2.89% | 896.20ms | 885.79ms | +1.17% | 1.43% | 1.06s | 1.01s | +4.77% | 14 | 433.27M | 2.16B |
| 06:SCAN HDFS | 2.83% | 877.34ms | 886.93ms | -1.08% | 1.23% | 888.16ms | 906.88ms | -2.06% | 1 | 365 | 373 |
| 19:EXCHANGE | 23.20% | 7.20s | 3.12s | +130.58% | * 56.73% * | 11.33s | 3.17s | +256.92% | 14 | 520 | 10.64K |
| 05:AGGREGATE | 12.06% | 3.74s | 1.34s | +178.49% | * 64.53% * | 6.33s | 1.53s | +314.84% | 14 | 520 | 10.64K |
| 04:HASH JOIN | 7.71% | 2.39s | 956.81ms | +149.90% | * 60.36% * | 4.04s | 1.13s | +256.75% | 14 | 442.29M | 2.16B |
| 03:SCAN HDFS | 2.83% | 878.97ms | 894.11ms | -1.69% | 1.34% | 890.78ms | 910.22ms | -2.14% | 1 | 371 | 73.05K |
+---------------------+------------+----------+----------+------------+------------+----------+----------+------------+--------+---------+-----------+
Change-Id: I626456b6afa9101eeeeffd5cda10c4096d63d7f9
Reviewed-on: http://gerrit.cloudera.org:8080/8758
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/057cc51b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/057cc51b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/057cc51b
Branch: refs/heads/2.x
Commit: 057cc51b54fffbeb800a8f3f819fed1f5bbf32d1
Parents: cf2e482
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Nov 28 15:08:29 2017 -0800
Committer: Philip Zeyliger <ph...@cloudera.com>
Committed: Wed Jan 24 10:17:57 2018 -0800
----------------------------------------------------------------------
be/src/common/atomic.h | 17 +++
be/src/runtime/coordinator-backend-state.cc | 46 ++++++++
be/src/runtime/coordinator-backend-state.h | 26 ++++-
be/src/runtime/coordinator.cc | 29 +++--
be/src/runtime/coordinator.h | 4 +
be/src/runtime/fragment-instance-state.cc | 130 ++++++++++++++++++++---
be/src/runtime/fragment-instance-state.h | 54 +++++++++-
be/src/runtime/query-state.cc | 2 +
be/src/runtime/query-state.h | 7 +-
be/src/service/impala-http-handler.cc | 22 ++++
be/src/service/impala-http-handler.h | 5 +
be/src/util/runtime-profile-counters.h | 69 ++++++++----
be/src/util/runtime-profile.cc | 21 +++-
be/src/util/stopwatch.h | 30 +++---
common/thrift/ImpalaInternalService.thrift | 21 +++-
tests/query_test/test_observability.py | 13 +++
tests/webserver/test_web_pages.py | 49 ++++++---
www/query_detail_tabs.tmpl | 1 +
www/query_finstances.tmpl | 129 ++++++++++++++++++++++
19 files changed, 594 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/common/atomic.h
----------------------------------------------------------------------
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 791f3b2..3925137 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -132,6 +132,23 @@ class AtomicPtr {
internal::AtomicInt<intptr_t> ptr_;
};
+/// Atomic enum. Operations have the same semantics as AtomicInt.
+template<typename T>
+class AtomicEnum {
+ static_assert(std::is_enum<T>::value, "Type must be enum");
+ static_assert(sizeof(typename std::underlying_type<T>::type) <= sizeof(int32_t),
+ "Underlying enum type must fit into 4 bytes");
+
+ public:
+ /// Atomic load with "acquire" memory-ordering semantic.
+ ALWAYS_INLINE T Load() const { return static_cast<T>(enum_.Load()); }
+
+ /// Atomic store with "release" memory-ordering semantic.
+ ALWAYS_INLINE void Store(T val) { enum_.Store(val); }
+
+ private:
+ internal::AtomicInt<int32_t> enum_;
+};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 9d89086..914a3e4 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -453,6 +453,7 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
void Coordinator::BackendState::InstanceStats::Update(
const TFragmentInstanceExecStatus& exec_status,
ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
+ last_report_time_ms_ = MonotonicMillis();
if (exec_status.done) stopwatch_.Stop();
profile_->Update(exec_status.profile);
if (!profile_created_) {
@@ -496,6 +497,32 @@ void Coordinator::BackendState::InstanceStats::Update(
int64_t delta = total - total_ranges_complete_;
total_ranges_complete_ = total;
scan_range_progress->Update(delta);
+
+ // extract the current execution state of this instance
+ current_state_ = exec_status.current_state;
+}
+
+void Coordinator::BackendState::InstanceStats::ToJson(Value* value, Document* document) {
+ Value instance_id_val(PrintId(exec_params_.instance_id).c_str(),
+ document->GetAllocator());
+ value->AddMember("instance_id", instance_id_val, document->GetAllocator());
+
+ // We send 'done' explicitly so we don't have to infer it by comparison with a string
+ // constant in the debug page JS code.
+ value->AddMember("done", done_, document->GetAllocator());
+
+ Value state_val(FragmentInstanceState::ExecStateToString(current_state_).c_str(),
+ document->GetAllocator());
+ value->AddMember("current_state", state_val, document->GetAllocator());
+
+ Value fragment_name_val(exec_params_.fragment().display_name.c_str(),
+ document->GetAllocator());
+ value->AddMember("fragment_name", fragment_name_val, document->GetAllocator());
+
+ value->AddMember("first_status_update_received", last_report_time_ms_ > 0,
+ document->GetAllocator());
+ value->AddMember("time_since_last_heard_from", MonotonicMillis() - last_report_time_ms_,
+ document->GetAllocator());
}
Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
@@ -583,3 +610,22 @@ void Coordinator::BackendState::ToJson(Value* value, Document* document) {
value->AddMember(
"num_remaining_instances", num_remaining_instances_, document->GetAllocator());
}
+
+void Coordinator::BackendState::InstanceStatsToJson(Value* value, Document* document) {
+ Value instance_stats(kArrayType);
+ {
+ lock_guard<mutex> l(lock_);
+ for (const auto& elem : instance_stats_map_) {
+ Value val(kObjectType);
+ elem.second->ToJson(&val, document);
+ instance_stats.PushBack(val, document->GetAllocator());
+ }
+ DCHECK_EQ(instance_stats.Size(), fragments_.size());
+ }
+ value->AddMember("instance_stats", instance_stats, document->GetAllocator());
+
+ // impalad_address is not protected by lock_. The lifetime of the backend state is
+ // protected by Coordinator::lock_.
+ Value val(TNetworkAddressToString(impalad_address()).c_str(), document->GetAllocator());
+ value->AddMember("host", val, document->GetAllocator());
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 73acef9..0973ca3 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -124,6 +124,10 @@ class Coordinator::BackendState {
/// number of instances, peak memory consumption, host and status amongst others.
void ToJson(rapidjson::Value* value, rapidjson::Document* doc);
+ /// Serializes the InstanceStats of all instances of this backend state to JSON by
+ /// adding members to 'value', including the remote host name.
+ void InstanceStatsToJson(rapidjson::Value* value, rapidjson::Document* doc);
+
private:
/// Execution stats for a single fragment instance.
/// Not thread-safe.
@@ -132,9 +136,9 @@ class Coordinator::BackendState {
InstanceStats(const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
ObjectPool* obj_pool);
- /// Update 'this' with exec_status, the fragment instances' TExecStats in
- /// exec_summary, and 'progress_updater' with the number of
- /// newly completed scan ranges. Also updates the instance's avg profile.
+ /// Updates 'this' with exec_status, the fragment instances' TExecStats in
+ /// exec_summary, and 'progress_updater' with the number of newly completed scan
+ /// ranges. Also updates the instance's avg profile.
void Update(const TFragmentInstanceExecStatus& exec_status,
ExecSummary* exec_summary, ProgressUpdater* scan_range_progress);
@@ -142,12 +146,20 @@ class Coordinator::BackendState {
return exec_params_.per_fragment_instance_idx;
}
+ /// Serializes instance stats to JSON by adding members to 'value', including its
+ /// instance id, plan fragment name, and the last event that was recorded during
+ /// execution of the instance.
+ void ToJson(rapidjson::Value* value, rapidjson::Document* doc);
+
private:
friend class BackendState;
/// query lifetime
const FInstanceExecParams& exec_params_;
+ /// Set in Update(). Uses MonotonicMillis().
+ int64_t last_report_time_ms_ = 0;
+
/// owned by coordinator object pool provided in the c'tor, created in Update()
RuntimeProfile* profile_;
@@ -172,9 +184,13 @@ class Coordinator::BackendState {
std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
/// PER_HOST_PEAK_MEM_COUNTER
- RuntimeProfile::Counter* peak_mem_counter_;
+ RuntimeProfile::Counter* peak_mem_counter_ = nullptr;
+
+ /// The current state of this fragment instance's execution. This gets serialized in
+ /// ToJson() and is displayed in the debug webpages.
+ TFInstanceExecState::type current_state_ = TFInstanceExecState::WAITING_FOR_EXEC;
- /// Extract scan_ranges_complete_counters_ and peak_mem_counter_ from profile_.
+ /// Extracts scan_ranges_complete_counters_ and peak_mem_counter_ from profile_.
void InitCounters();
};
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 85ff810..7973775 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -459,8 +459,8 @@ Status Coordinator::GetStatus() {
return query_status_;
}
- Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
- bool is_fragment_failure, const TUniqueId& instance_id) {
+Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
+ bool is_fragment_failure, const TUniqueId& instance_id) {
{
lock_guard<mutex> l(lock_);
@@ -1258,13 +1258,28 @@ MemTracker* Coordinator::query_mem_tracker() const {
}
void Coordinator::BackendsToJson(Document* doc) {
- lock_guard<mutex> l(lock_);
Value states(kArrayType);
- for (BackendState* state : backend_states_) {
- Value val(kObjectType);
- state->ToJson(&val, doc);
- states.PushBack(val, doc->GetAllocator());
+ {
+ lock_guard<mutex> l(lock_);
+ for (BackendState* state : backend_states_) {
+ Value val(kObjectType);
+ state->ToJson(&val, doc);
+ states.PushBack(val, doc->GetAllocator());
+ }
}
doc->AddMember("backend_states", states, doc->GetAllocator());
}
+
+void Coordinator::FInstanceStatsToJson(Document* doc) {
+ Value states(kArrayType);
+ {
+ lock_guard<mutex> l(lock_);
+ for (BackendState* state : backend_states_) {
+ Value val(kObjectType);
+ state->InstanceStatsToJson(&val, doc);
+ states.PushBack(val, doc->GetAllocator());
+ }
+ }
+ doc->AddMember("backend_instances", states, doc->GetAllocator());
+}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index e7ddee9..d630b9a 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -183,6 +183,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// 'backend_states'.
void BackendsToJson(rapidjson::Document* document);
+ /// Adds to 'document' a serialized array of all backend names and stats of all fragment
+ /// instances running on each backend in a member named 'backend_instances'.
+ void FInstanceStatsToJson(rapidjson::Document* document);
+
private:
class BackendState;
struct FilterTarget;
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 5c9deb8..16b4a7e 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -61,7 +61,6 @@ static const string OPEN_TIMER_NAME = "OpenTime";
static const string PREPARE_TIMER_NAME = "PrepareTime";
static const string EXEC_TIMER_NAME = "ExecTime";
-
FragmentInstanceState::FragmentInstanceState(
QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& instance_ctx)
@@ -91,6 +90,7 @@ Status FragmentInstanceState::Exec() {
}
done:
+ UpdateState(StateEvent::EXEC_END);
// call this before Close() to make sure the thread token got released
Finalize(status);
Close();
@@ -114,7 +114,6 @@ void FragmentInstanceState::Cancel() {
Status FragmentInstanceState::Prepare() {
DCHECK(!prepared_promise_.IsSet());
-
VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_);
// Do not call RETURN_IF_ERROR or explicitly return before this line,
@@ -129,11 +128,17 @@ Status FragmentInstanceState::Prepare() {
profile()->AddChild(timings_profile_);
SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
+ // Events that are tracked in a separate timeline for each fragment instance, relative
+ // to the startup of the query state.
+ event_sequence_ =
+ profile()->AddEventSequence("Fragment Instance Lifecycle Event Timeline");
+ event_sequence_->Start(query_state_->fragment_events_start_time());
+ UpdateState(StateEvent::PREPARE_START);
+
runtime_state_->InitFilterBank();
// Reserve one main thread from the pool
runtime_state_->resource_pool()->AcquireThreadToken();
-
avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
runtime_state_->resource_pool()));
@@ -201,15 +206,6 @@ Status FragmentInstanceState::Prepare() {
ReleaseThreadToken();
}
- if (runtime_state_->ShouldCodegen()) {
- RETURN_IF_ERROR(runtime_state_->CreateCodegen());
- exec_tree_->Codegen(runtime_state_);
- // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
- // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
- // the error status for now.
- RETURN_IF_ERROR(runtime_state_->CodegenScalarFns());
- }
-
// set up profile counters
profile()->AddChild(exec_tree_->runtime_profile());
rows_produced_counter_ =
@@ -247,14 +243,22 @@ Status FragmentInstanceState::Open() {
SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
- // codegen prior to exec_tree_->Open()
if (runtime_state_->ShouldCodegen()) {
+ UpdateState(StateEvent::CODEGEN_START);
+ RETURN_IF_ERROR(runtime_state_->CreateCodegen());
+ exec_tree_->Codegen(runtime_state_);
+ // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
+ // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
+ // the error status for now.
+ RETURN_IF_ERROR(runtime_state_->CodegenScalarFns());
+
LlvmCodeGen* codegen = runtime_state_->codegen();
DCHECK(codegen != nullptr);
RETURN_IF_ERROR(codegen->FinalizeModule());
}
{
+ UpdateState(StateEvent::OPEN_START);
SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME));
RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
}
@@ -266,6 +270,7 @@ Status FragmentInstanceState::ExecInternal() {
ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
bool exec_tree_complete = false;
+ UpdateState(StateEvent::WAITING_FOR_FIRST_BATCH);
do {
Status status;
row_batch_->Reset();
@@ -274,11 +279,14 @@ Status FragmentInstanceState::ExecInternal() {
RETURN_IF_ERROR(
exec_tree_->GetNext(runtime_state_, row_batch_.get(), &exec_tree_complete));
}
+ UpdateState(StateEvent::BATCH_PRODUCED);
if (VLOG_ROW_IS_ON) row_batch_->VLogRows("FragmentInstanceState::ExecInternal()");
COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
RETURN_IF_ERROR(sink_->Send(runtime_state_, row_batch_.get()));
+ UpdateState(StateEvent::BATCH_SENT);
} while (!exec_tree_complete);
+ UpdateState(StateEvent::LAST_BATCH_SENT);
// Flush the sink *before* stopping the report thread. Flush may need to add some
// important information to the last report that gets sent. (e.g. table sinks record the
// files they have written to in this method)
@@ -374,6 +382,79 @@ void FragmentInstanceState::SendReport(bool done, const Status& status) {
query_state_->ReportExecStatus(done, status, this);
}
+void FragmentInstanceState::UpdateState(const StateEvent event)
+{
+ TFInstanceExecState::type current_state = current_state_.Load();
+ TFInstanceExecState::type next_state = current_state;
+ switch (event) {
+ case StateEvent::PREPARE_START:
+ DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_EXEC);
+ next_state = TFInstanceExecState::WAITING_FOR_PREPARE;
+ break;
+
+ case StateEvent::CODEGEN_START:
+ DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_PREPARE);
+ event_sequence_->MarkEvent("Prepare Finished");
+ next_state = TFInstanceExecState::WAITING_FOR_CODEGEN;
+ break;
+
+ case StateEvent::OPEN_START:
+ if (current_state == TFInstanceExecState::WAITING_FOR_PREPARE) {
+ event_sequence_->MarkEvent("Prepare Finished");
+ } else {
+ DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_CODEGEN);
+ }
+ next_state = TFInstanceExecState::WAITING_FOR_OPEN;
+ break;
+
+ case StateEvent::WAITING_FOR_FIRST_BATCH:
+ DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_OPEN);
+ next_state = TFInstanceExecState::WAITING_FOR_FIRST_BATCH;
+ break;
+
+ case StateEvent::BATCH_PRODUCED:
+ if (UNLIKELY(current_state == TFInstanceExecState::WAITING_FOR_FIRST_BATCH)) {
+ event_sequence_->MarkEvent("First Batch Produced");
+ next_state = TFInstanceExecState::FIRST_BATCH_PRODUCED;
+ } else {
+ DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+ }
+ break;
+
+ case StateEvent::BATCH_SENT:
+ if (UNLIKELY(current_state == TFInstanceExecState::FIRST_BATCH_PRODUCED)) {
+ event_sequence_->MarkEvent("First Batch Sent");
+ next_state = TFInstanceExecState::PRODUCING_DATA;
+ } else {
+ DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+ }
+ break;
+
+ case StateEvent::LAST_BATCH_SENT:
+ if (UNLIKELY(current_state == TFInstanceExecState::WAITING_FOR_OPEN)) {
+ event_sequence_->MarkEvent("Open Finished");
+ } else {
+ DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+ }
+ next_state = TFInstanceExecState::LAST_BATCH_SENT;
+ break;
+
+ case StateEvent::EXEC_END:
+ // Allow abort in all states to make error handling easier.
+ event_sequence_->MarkEvent("ExecInternal Finished");
+ next_state = TFInstanceExecState::FINISHED;
+ break;
+
+ default:
+ DCHECK(false) << "Unexpected Event: " << static_cast<int>(event);
+ break;
+ }
+ // current_state_ is an AtomicEnum to add memory barriers for concurrent reads by the
+ // profile reporting thread. This method is the only one updating it and is not
+ // meant to be thread safe.
+ if (next_state != current_state) current_state_.Store(next_state);
+}
+
void FragmentInstanceState::StopReportThread() {
if (!report_thread_active_) return;
{
@@ -426,6 +507,29 @@ void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
runtime_state_->filter_bank()->PublishGlobalFilter(params);
}
+string FragmentInstanceState::ExecStateToString(const TFInstanceExecState::type state) {
+ // Labels to send to the debug webpages to display the current state to the user.
+ static const string finstance_state_labels[] = {
+ "Waiting for Exec", // WAITING_FOR_EXEC
+ "Waiting for Codegen", // WAITING_FOR_CODEGEN
+ "Waiting for Prepare", // WAITING_FOR_PREPARE
+ "Waiting for First Batch", // WAITING_FOR_OPEN
+ "Waiting for First Batch", // WAITING_FOR_FIRST_BATCH
+ "First batch produced", // FIRST_BATCH_PRODUCED
+ "Producing Data", // PRODUCING_DATA
+ "Last batch sent", // LAST_BATCH_SENT
+ "Finished" // FINISHED
+ };
+ /// Make sure we have a label for every possible state.
+ static_assert(
+ sizeof(finstance_state_labels) / sizeof(char*) == TFInstanceExecState::FINISHED + 1,
+ "");
+
+ DCHECK_LT(state, sizeof(finstance_state_labels) / sizeof(char*))
+ << "Unknown instance state";
+ return finstance_state_labels[state];
+}
+
const TQueryCtx& FragmentInstanceState::query_ctx() const {
return query_state_->query_ctx();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 4e832f6..292b93c 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -23,6 +23,7 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
+#include "common/atomic.h"
#include "common/status.h"
#include "util/promise.h"
@@ -54,12 +55,14 @@ class RuntimeState;
/// for this fragment instance and closes all data streams.
///
/// The FIS makes an aggregated profile for the entire fragment available, which
-/// includes profile information for the plan itself as well as the output sink.
+/// includes profile information for the plan itself as well as the output sink. It also
+/// contains a timeline of events of the fragment instance.
/// The FIS periodically makes a ReportExecStatus RPC to the coordinator to report the
-/// execution status and profile. The frequency of those reports is controlled by the flag
-/// status_report_interval; setting that flag to 0 disables periodic reporting altogether
-/// Regardless of the value of that flag, a report is sent at least once at the end of
-/// execution with an overall status and profile (and 'done' indicator).
+/// execution status, the current state of the execution, and the instance profile. The
+/// frequency of those reports is controlled by the flag status_report_interval; setting
+/// that flag to 0 disables periodic reporting altogether Regardless of the value of that
+/// flag, a report is sent at least once at the end of execution with an overall status
+/// and profile (and 'done' indicator).
/// The FIS will send at least one final status report. If execution ended with an error,
/// that error status will be part of the final report (it will not be overridden by
/// the resulting cancellation).
@@ -105,6 +108,9 @@ class FragmentInstanceState {
/// the Prepare phase. May be nullptr.
PlanRootSink* root_sink() { return root_sink_; }
+ /// Returns a string description of 'current_state_'.
+ static string ExecStateToString(const TFInstanceExecState::type state);
+
/// Name of the counter that is tracking per query, per host peak mem usage.
/// TODO: this doesn't look like it belongs here
static const std::string PER_HOST_PEAK_MEM_COUNTER;
@@ -117,6 +123,7 @@ class FragmentInstanceState {
const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; }
const TUniqueId& query_id() const { return query_ctx().query_id; }
const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
+ TFInstanceExecState::type current_state() const { return current_state_.Load(); }
const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
ObjectPool* obj_pool();
@@ -155,6 +162,37 @@ class FragmentInstanceState {
/// Lives in obj_pool().
RuntimeProfile* timings_profile_ = nullptr;
+ /// Event sequence tracking the completion of various stages of this fragment instance.
+ /// Updated in UpdateState().
+ RuntimeProfile::EventSequence* event_sequence_ = nullptr;
+
+ /// Events that change the current state of this instance's execution, which is kept in
+ /// 'current_state_'. Events are issued throughout the execution by calling
+ /// UpdateState(), which implements a state machine. See the implementation of
+ /// UpdateState() for valid state transitions.
+ enum class StateEvent {
+ /// Indicates the start of execution.
+ PREPARE_START,
+ /// Indicates that codegen will get called. Omitted if not doing codegen.
+ CODEGEN_START,
+ /// Indicates the call to Open().
+ OPEN_START,
+ /// Indicates waiting for the first batch to arrive.
+ WAITING_FOR_FIRST_BATCH,
+ /// Indicates that a new batch was produced by this instance.
+ BATCH_PRODUCED,
+ /// Indicates that a batch has been sent.
+ BATCH_SENT,
+ /// Indicates that no new batches will be received.
+ LAST_BATCH_SENT,
+ /// Indicates the end of this instance's execution.
+ EXEC_END
+ };
+
+ /// The current state of this fragment instance's execution. Only updated by the
+ /// fragment instance thread in UpdateState() and read by the profile reporting threads.
+ AtomicEnum<TFInstanceExecState::type> current_state_;
+
/// Output sink for rows sent to this fragment. Created in Prepare(), lives in
/// obj_pool().
DataSink* sink_ = nullptr;
@@ -232,6 +270,12 @@ class FragmentInstanceState {
/// ReportProfileThread() thread will do periodically.
void SendReport(bool done, const Status& status);
+ /// Handle the execution event 'event'. This implements a state machine and will update
+ /// the current execution state of this fragment instance. Also marks an event in
+ /// 'event_sequence_' for some states. Must not be called by multiple threads
+ /// concurrently.
+ void UpdateState(const StateEvent event);
+
/// Called when execution is complete to finalize counters and send the final status
/// report. Must be called only once. Can handle partially-finished Prepare().
void Finalize(const Status& status);
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 259cd34..5a11caf 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -227,6 +227,7 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
instance_status.__set_fragment_instance_id(fis->instance_id());
status.SetTStatus(&instance_status);
instance_status.__set_done(done);
+ instance_status.__set_current_state(fis->current_state());
DCHECK(fis->profile() != nullptr);
fis->profile()->ToThrift(&instance_status.profile);
@@ -304,6 +305,7 @@ void QueryState::StartFInstances() {
DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
int fragment_ctx_idx = 0;
+ fragment_events_start_time_ = MonotonicStopWatch::Now();
for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) {
// determine corresponding TPlanFragmentCtx
if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index f7b83a7..ae3bdd5 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -109,7 +109,7 @@ class QueryState {
}
MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
- // the following getters are only valid after Prepare()
+ // the following getters are only valid after Init()
ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
InitialReservations* initial_reservations() const { return initial_reservations_; }
TmpFileMgr::FileGroup* file_group() const { return file_group_; }
@@ -117,6 +117,7 @@ class QueryState {
// the following getters are only valid after StartFInstances()
const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
+ int64_t fragment_events_start_time() const { return fragment_events_start_time_; }
/// Sets up state required for fragment execution: memory reservations, etc. Fails
/// if resources could not be acquired. Acquires a resource refcount and returns it
@@ -243,6 +244,10 @@ class QueryState {
/// "num-queries-spilled" metric.
AtomicInt32 query_spilled_;
+ /// Records the point in time when fragment instances are started up. Set in
+ /// StartFInstances().
+ int64_t fragment_events_start_time_ = 0;
+
/// Create QueryState w/ refcnt of 0.
/// The query is associated with the resource pool query_ctx.request_pool or
/// 'request_pool', if the former is not set (needed for tests).
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 33c5e73..b633f2a 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -104,6 +104,9 @@ void ImpalaHttpHandler::RegisterHandlers(Webserver* webserver) {
webserver->RegisterUrlCallback("/query_backends", "query_backends.tmpl",
MakeCallback(this, &ImpalaHttpHandler::QueryBackendsHandler), false);
+ webserver->RegisterUrlCallback("/query_finstances", "query_finstances.tmpl",
+ MakeCallback(this, &ImpalaHttpHandler::QueryFInstancesHandler), false);
+
webserver->RegisterUrlCallback("/cancel_query", "common-pre.tmpl",
MakeCallback(this, &ImpalaHttpHandler::CancelQueryHandler), false);
@@ -708,6 +711,25 @@ void ImpalaHttpHandler::QueryBackendsHandler(
request_state->coord()->BackendsToJson(document);
}
+void ImpalaHttpHandler::QueryFInstancesHandler(
+ const Webserver::ArgumentMap& args, Document* document) {
+ TUniqueId query_id;
+ Status status = ParseIdFromArguments(args, &query_id, "query_id");
+ Value query_id_val(PrintId(query_id).c_str(), document->GetAllocator());
+ document->AddMember("query_id", query_id_val, document->GetAllocator());
+ if (!status.ok()) {
+ // Redact the error message, it may contain part or all of the query.
+ Value json_error(RedactCopy(status.GetDetail()).c_str(), document->GetAllocator());
+ document->AddMember("error", json_error, document->GetAllocator());
+ return;
+ }
+
+ shared_ptr<ClientRequestState> request_state = server_->GetClientRequestState(query_id);
+ if (request_state.get() == nullptr || request_state->coord() == nullptr) return;
+
+ request_state->coord()->FInstanceStatsToJson(document);
+}
+
void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include_summary,
const Webserver::ArgumentMap& args, Document* document) {
TUniqueId query_id;
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/service/impala-http-handler.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h
index 8ad84bd..f2492ff 100644
--- a/be/src/service/impala-http-handler.h
+++ b/be/src/service/impala-http-handler.h
@@ -96,6 +96,11 @@ class ImpalaHttpHandler {
void QueryBackendsHandler(
const Webserver::ArgumentMap& args, rapidjson::Document* document);
+ /// If 'args' contains a query id, serializes all fragment instance states for all
+ /// backends for that query to 'document'.
+ void QueryFInstancesHandler(
+ const Webserver::ArgumentMap& args, rapidjson::Document* document);
+
/// Cancels an in-flight query and writes the result to 'contents'.
void CancelQueryHandler(const Webserver::ArgumentMap& args,
rapidjson::Document* document);
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 9227e66..62281f1 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -19,10 +19,11 @@
#ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
#define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
+#include <algorithm>
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_map.hpp>
-#include <sys/time.h>
#include <sys/resource.h>
+#include <sys/time.h>
#include "common/atomic.h"
#include "common/logging.h"
@@ -277,11 +278,10 @@ class RuntimeProfile::ThreadCounters {
Counter* involuntary_context_switches_;
};
-/// An EventSequence captures a sequence of events (each added by
-/// calling MarkEvent). Each event has a text label, and a time
-/// (measured relative to the moment Start() was called as t=0). It is
-/// useful for tracking the evolution of some serial process, such as
-/// the query lifecycle.
+/// An EventSequence captures a sequence of events (each added by calling MarkEvent()).
+/// Each event has a text label and a time (measured relative to the moment Start() was
+/// called as t=0, or to the parameter 'when' passed to Start(int64_t when)). It is useful
+/// for tracking the evolution of some serial process, such as the query lifecycle.
class RuntimeProfile::EventSequence {
public:
EventSequence() { }
@@ -298,12 +298,20 @@ class RuntimeProfile::EventSequence {
/// Starts the timer without resetting it.
void Start() { sw_.Start(); }
+ /// Starts the timer. All events will be recorded as if the timer had been started at
+ /// 'start_time_ns', which must have been obtained by calling MonotonicStopWatch::Now().
+ void Start(int64_t start_time_ns) {
+ offset_ = MonotonicStopWatch::Now() - start_time_ns;
+ DCHECK_GE(offset_, 0);
+ sw_.Start();
+ }
+
/// Stores an event in sequence with the given label and the current time
/// (relative to the first time Start() was called) as the timestamp.
- void MarkEvent(const std::string& label) {
- Event event = make_pair(label, sw_.ElapsedTime());
+ void MarkEvent(std::string label) {
+ Event event = make_pair(move(label), sw_.ElapsedTime());
boost::lock_guard<SpinLock> event_lock(lock_);
- events_.push_back(event);
+ events_.emplace_back(move(event));
}
int64_t ElapsedTime() { return sw_.ElapsedTime(); }
@@ -311,34 +319,57 @@ class RuntimeProfile::EventSequence {
/// An Event is a <label, timestamp> pair.
typedef std::pair<std::string, int64_t> Event;
- /// An EventList is a sequence of Events, in increasing timestamp order.
+ /// An EventList is a sequence of Events.
typedef std::vector<Event> EventList;
- /// Copies the member events_ into the supplied vector 'events'.
- /// The supplied vector 'events' is cleared before this.
+ /// Returns a copy of 'events_' in the supplied vector 'events', sorted by their
+ /// timestamps. The supplied vector 'events' is cleared before this.
void GetEvents(std::vector<Event>* events) {
events->clear();
boost::lock_guard<SpinLock> event_lock(lock_);
- /// It's possible that concurrent events can be logged out of sequence.
- /// So sort the events each time we are here.
+ /// It's possible that MarkEvent() logs concurrent events out of sequence so we sort
+ /// the events each time we are here.
+ SortEvents();
+ events->insert(events->end(), events_.begin(), events_.end());
+ }
+
+ /// Adds all events from the input parameters that are newer than the last member of
+ /// 'events_'. The caller must make sure that 'timestamps' is sorted.
+ void AddNewerEvents(
+ const std::vector<int64_t>& timestamps, const std::vector<std::string>& labels) {
+ DCHECK_EQ(timestamps.size(), labels.size());
+ DCHECK(std::is_sorted(timestamps.begin(), timestamps.end()));
+ boost::lock_guard<SpinLock> event_lock(lock_);
+ int64_t last_timestamp = events_.back().second;
+ for (int64_t i = 0; i < timestamps.size(); ++i) {
+ if (timestamps[i] <= last_timestamp) continue;
+ events_.push_back(make_pair(labels[i], timestamps[i]));
+ }
+ }
+
+ void ToThrift(TEventSequence* seq);
+
+ private:
+ /// Sorts events by their timestamp. Caller must hold lock_.
+ void SortEvents() {
std::sort(events_.begin(), events_.end(),
[](Event const &event1, Event const &event2) {
return event1.second < event2.second;
});
- events->insert(events->end(), events_.begin(), events_.end());
}
- void ToThrift(TEventSequence* seq) const;
-
- private:
/// Protect access to events_.
SpinLock lock_;
- /// Stored in increasing time order.
+ /// Sequence of events. Due to a race in MarkEvent() these are not necessarily ordered.
EventList events_;
/// Timer which allows events to be timestamped when they are recorded.
MonotonicStopWatch sw_;
+
+ /// Constant offset that gets added to each event's timestamp. This allows to
+ /// synchronize events captured in multiple threads to a common starting point.
+ int64_t offset_ = 0;
};
typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index a05e55c..c057cac 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -291,7 +291,6 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
if (it == time_series_counter_map_.end()) {
time_series_counter_map_[c.name] =
pool_->Add(new TimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
- it = time_series_counter_map_.find(c.name);
} else {
it->second->samples_.SetSamples(c.period_ms, c.values);
}
@@ -299,6 +298,20 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
}
{
+ lock_guard<SpinLock> l(event_sequence_lock_);
+ for (int i = 0; i < node.event_sequences.size(); ++i) {
+ const TEventSequence& seq = node.event_sequences[i];
+ EventSequenceMap::iterator it = event_sequence_map_.find(seq.name);
+ if (it == event_sequence_map_.end()) {
+ event_sequence_map_[seq.name] =
+ pool_->Add(new EventSequence(seq.timestamps, seq.labels));
+ } else {
+ it->second->AddNewerEvents(seq.timestamps, seq.labels);
+ }
+ }
+ }
+
+ {
lock_guard<SpinLock> l(summary_stats_map_lock_);
for (int i = 0; i < node.summary_stats_counters.size(); ++i) {
const TSummaryStatsCounter& c = node.summary_stats_counters[i];
@@ -1058,7 +1071,11 @@ string RuntimeProfile::TimeSeriesCounter::DebugString() const {
return ss.str();
}
-void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) const {
+void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) {
+ lock_guard<SpinLock> l(lock_);
+ /// It's possible that concurrent events can be logged out of sequence so we sort the
+ /// events before serializing them.
+ SortEvents();
for (const EventSequence::Event& ev: events_) {
seq->labels.push_back(ev.first);
seq->timestamps.push_back(ev.second);
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/be/src/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index c1f85aa..443b655 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -145,20 +145,8 @@ class MonotonicStopWatch {
return total_time_;
}
- private:
- /// Start epoch value.
- uint64_t start_;
-
- /// Total elapsed time in nanoseconds.
- uint64_t total_time_;
-
- /// Upper bound of the running time as a epoch value. If the value is larger than 0,
- /// the stopwatch interprets this as a time ceiling is set.
- uint64_t time_ceiling_;
-
- /// True if stopwatch is running.
- bool running_;
-
+ /// Returns an representation of the current time in nanoseconds. It can be used to
+ /// measure time durations by repeatedly calling this function and comparing the result.
/// While this function returns nanoseconds, its resolution may be as large as
/// milliseconds, depending on OsInfo::fast_clock().
static inline int64_t Now() {
@@ -174,6 +162,20 @@ class MonotonicStopWatch {
#endif
}
+ private:
+ /// Start epoch value.
+ uint64_t start_;
+
+ /// Total elapsed time in nanoseconds.
+ uint64_t total_time_;
+
+ /// Upper bound of the running time as a epoch value. If the value is larger than 0,
+ /// the stopwatch interprets this as a time ceiling is set.
+ uint64_t time_ceiling_;
+
+ /// True if stopwatch is running.
+ bool running_;
+
/// Returns the time since start.
/// If time_ceiling_ is set, the stop watch won't run pass the ceiling.
uint64_t RunningTime() const {
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index a584ce1..121c551 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -50,7 +50,7 @@ enum TParquetFallbackSchemaResolution {
NAME
}
-// The order of the enum values needs to be kepy in sync with
+// The order of the enum values needs to be kept in sync with
// ParquetMetadataUtils::ORDERED_ARRAY_ENCODINGS in parquet-metadata-utils.cc.
enum TParquetArrayResolution {
THREE_LEVEL,
@@ -606,6 +606,21 @@ struct TErrorLogEntry {
2: list<string> messages
}
+// Represents the states that a fragment instance goes through during its execution. The
+// current state gets sent back to the coordinator and will be presented to users through
+// the debug webpages.
+enum TFInstanceExecState {
+ WAITING_FOR_EXEC,
+ WAITING_FOR_CODEGEN,
+ WAITING_FOR_PREPARE,
+ WAITING_FOR_OPEN,
+ WAITING_FOR_FIRST_BATCH,
+ FIRST_BATCH_PRODUCED,
+ PRODUCING_DATA,
+ LAST_BATCH_SENT,
+ FINISHED
+}
+
struct TFragmentInstanceExecStatus {
// required in V1
1: optional Types.TUniqueId fragment_instance_id
@@ -621,6 +636,10 @@ struct TFragmentInstanceExecStatus {
// cumulative profile
// required in V1
4: optional RuntimeProfile.TRuntimeProfileTree profile
+
+ // The current state of this fragment instance's execution.
+ // required in V1
+ 5: optional TFInstanceExecState current_state
}
struct TReportExecStatusParams {
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index e8599b5..85fc4f1 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -181,3 +181,16 @@ class TestObservability(ImpalaTestSuite):
dbg_str = "Debug thrift profile for query {0} not available in {1} seconds".format(
query_id, MAX_WAIT)
assert False, dbg_str
+
+ def test_query_profile_contains_instance_events(self, unique_database):
+ """Test that /query_profile_encoded contains an event timeline for fragment
+ instances, even when there are errors."""
+ events = ["Fragment Instance Lifecycle Event Timeline",
+ "Prepare Finished",
+ "First Batch Produced",
+ "First Batch Sent",
+ "ExecInternal Finished"]
+ query = "select count(*) from functional.alltypes"
+ runtime_profile = self.execute_query(query).runtime_profile
+ for event in events:
+ assert event in runtime_profile
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 54163dc..8dd17a4 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -31,6 +31,7 @@ class TestWebPage(ImpalaTestSuite):
CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object"
TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
+ QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances"
THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
# log4j changes do not apply to the statestore since it doesn't
# have an embedded JVM. So we make two sets of ports to test the
@@ -173,7 +174,21 @@ class TestWebPage(ImpalaTestSuite):
self.get_and_check_status(self.TABLE_METRICS_URL +
"?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT)
- def test_query_details(self, unique_database):
+ def __run_query_and_get_debug_page(self, query, page_url):
+ """Runs a query to obtain the content of the debug page pointed to by page_url, then
+ cancels the query."""
+ query_handle = self.client.execute_async(query)
+ response_json = ""
+ try:
+ response = self.get_and_check_status(
+ page_url + "?query_id=%s&json" % query_handle.get_handle().id,
+ ports_to_test=[25000])
+ response_json = json.loads(response)
+ finally:
+ self.client.cancel(query_handle)
+ return response_json
+
+ def test_backend_states(self, unique_database):
"""Test that /query_backends returns the list of backend states for DML or queries;
nothing for DDL statements"""
CROSS_JOIN = ("select count(*) from functional.alltypes a "
@@ -181,20 +196,27 @@ class TestWebPage(ImpalaTestSuite):
for q in [CROSS_JOIN,
"CREATE TABLE {0}.foo AS {1}".format(unique_database, CROSS_JOIN),
"DESCRIBE functional.alltypes"]:
- query_handle = self.client.execute_async(q)
- try:
- response = self.get_and_check_status(
- self.QUERY_BACKENDS_URL + "?query_id=%s&json" % query_handle.get_handle().id,
- ports_to_test=[25000])
+ response_json = self.__run_query_and_get_debug_page(q, self.QUERY_BACKENDS_URL)
- response_json = json.loads(response)
+ if "DESCRIBE" not in q:
+ assert len(response_json['backend_states']) > 0
+ else:
+ assert 'backend_states' not in response_json
- if "DESCRIBE" not in q:
- assert len(response_json['backend_states']) > 0
- else:
- assert 'backend_states' not in response_json
- finally:
- self.client.cancel(query_handle)
+ def test_backend_instances(self, unique_database):
+ """Test that /query_finstances returns the list of fragment instances for DML or
+ queries; nothing for DDL statements"""
+ CROSS_JOIN = ("select count(*) from functional.alltypes a "
+ "CROSS JOIN functional.alltypes b CROSS JOIN functional.alltypes c")
+ for q in [CROSS_JOIN,
+ "CREATE TABLE {0}.foo AS {1}".format(unique_database, CROSS_JOIN),
+ "DESCRIBE functional.alltypes"]:
+ response_json = self.__run_query_and_get_debug_page(q, self.QUERY_FINSTANCES_URL)
+
+ if "DESCRIBE" not in q:
+ assert len(response_json['backend_instances']) > 0
+ else:
+ assert 'backend_instances' not in response_json
def test_io_mgr_threads(self):
"""Test that IoMgr threads have readable names. This test assumed that all systems we
@@ -207,4 +229,3 @@ class TestWebPage(ImpalaTestSuite):
for pattern in expected_name_patterns:
assert any(pattern in t for t in thread_names), \
"Could not find thread matching '%s'" % pattern
-
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/www/query_detail_tabs.tmpl
----------------------------------------------------------------------
diff --git a/www/query_detail_tabs.tmpl b/www/query_detail_tabs.tmpl
index 0318761..8e413a1 100644
--- a/www/query_detail_tabs.tmpl
+++ b/www/query_detail_tabs.tmpl
@@ -28,4 +28,5 @@ under the License.
<li id="profile-tab" role="presentation"><a href="/query_profile?query_id={{query_id}}">Profile</a></li>
<li id="memory-tab" role="presentation"><a href="/query_memory?query_id={{query_id}}">Memory</a></li>
<li id="backends-tab" role="presentation"><a href="/query_backends?query_id={{query_id}}">Backends</a></li>
+ <li id="finstances-tab" role="presentation"><a href="/query_finstances?query_id={{query_id}}">Fragment Instances</a></li>
</ul>
http://git-wip-us.apache.org/repos/asf/impala/blob/057cc51b/www/query_finstances.tmpl
----------------------------------------------------------------------
diff --git a/www/query_finstances.tmpl b/www/query_finstances.tmpl
new file mode 100644
index 0000000..5d8ba33
--- /dev/null
+++ b/www/query_finstances.tmpl
@@ -0,0 +1,129 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+{{> www/common-header.tmpl }}
+{{> www/query_detail_tabs.tmpl }}
+<br/>
+{{?backend_instances}}
+<div>
+ <label>
+ <input type="checkbox" checked="true" id="toggle" onClick="toggleRefresh()"/>
+ <span id="refresh_on">Auto-refresh on</span>
+ </label> Last updated: <span id="last-updated"></span>
+</div>
+
+<br/>
+<table id="finstances" class='table table-hover table-bordered'>
+ <thead>
+ <tr>
+ <th>Host</th>
+ <th>Fragment<br/>Name</th>
+ <th>Instance ID</th>
+ <th>Current state</th>
+ <th>Done</th>
+ <th>Time since last report (ms)</th>
+ </tr>
+ </thead>
+ <tbody>
+
+ </tbody>
+</table>
+
+<script>
+document.getElementById("finstances-tab").className = "active";
+
+var intervalId = 0;
+var table = null;
+var refresh = function () {
+ table.ajax.reload();
+ document.getElementById("last-updated").textContent = new Date();
+};
+
+// Unpack Json backend_states by merging the backend host name into every instance stats
+// row. Also clears the last report timestamp field for instances that have not started or
+// have already finished execution.
+function unpackJson(json) {
+ var result = new Array();
+ if (typeof json.backend_instances === "undefined") {
+ // Table will be empty, remove it.
+ table.table().destroy(true);
+ $("#finstances").remove();
+ // Display completion message.
+ $("#query_finished_alert").css("visibility", "visible");
+ // Stop auto refresh
+ $("#toggle").prop("checked", false);
+ toggleRefresh();
+ return json;
+ }
+ for (var i = 0; i < json.backend_instances.length; ++i) {
+ var backend_state = json.backend_instances[i];
+ var instance_stats = backend_state.instance_stats;
+ for (var j = 0; j < instance_stats.length; ++j) {
+ var instance = instance_stats[j];
+ instance.host = backend_state.host;
+ if (instance.done) instance.time_since_last_heard_from = "";
+ if (!instance.first_status_update_received) {
+ instance.time_since_last_heard_from = "";
+ }
+ delete instance.first_status_update_received;
+ result.push(instance);
+ }
+ }
+ return result;
+}
+
+$(document).ready(function() {
+ table = $('#finstances').DataTable({
+ ajax: { url: "/query_finstances?query_id={{query_id}}&json",
+ dataSrc: unpackJson,
+ },
+ "columns": [ {data: 'host'},
+ {data: 'fragment_name'},
+ {data: 'instance_id'},
+ {data: 'current_state'},
+ {data: 'done'},
+ {data: 'time_since_last_heard_from'}],
+ "order": [[ 0, "desc" ]],
+ "pageLength": 100
+ });
+ intervalId = setInterval(refresh, 1000);
+});
+
+function toggleRefresh() {
+ if (document.getElementById("toggle").checked == true) {
+ intervalId = setInterval(refresh, 1000);
+ document.getElementById("refresh_on").textContent = "Auto-refresh on";
+ } else {
+ clearInterval(intervalId);
+ document.getElementById("refresh_on").textContent = "Auto-refresh off";
+ }
+}
+
+</script>
+{{/backend_instances}}
+
+<div class="alert alert-info" role="alert" id="query_finished_alert"
+ style="visibility:hidden">
+Query <strong>{{query_id}}</strong> has completed, or has not started any backends, yet.
+</div>
+{{^backend_instances}}
+<script>$("#query_finished_alert").css("visibility", "visible");</script>
+{{/backend_instances}}
+
+{{> www/common-footer.tmpl }}