You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/24 07:03:26 UTC

[6/9] impala git commit: IMPALA-6190/6246: Add instances tab and event sequence

IMPALA-6190/6246: Add instances tab and event sequence

This change adds tracking of the current state during the execution of a
fragment instance. The current state is then reported back to the
coordinator and exposed to users via a new tab in the query detail debug
webpage.

This change also adds an event timeline to fragment instances in the
query profile. The timeline measures the time since backend-local query
start at which particular events complete. Events are derived from the
current state of the execution of a fragment instance. For example:

    - Prepare Finished: 13.436ms (13.436ms)
    - First Batch Produced: 1s022ms (1s008ms)
    - First Batch Sent: 1s022ms (455.558us)
    - ExecInternal Finished: 2s783ms (1s760ms)

I added automated tests for both extensions and additionally verified
the change by manual inspection.

Here are the TPCH performance comparison results between this change and
the previous commit on a 16 node cluster.

+------------+-----------------------+---------+------------+------------+----------------+
| Workload   | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+------------+-----------------------+---------+------------+------------+----------------+
| TPCH(_300) | parquet / none / none | 18.47   | -0.94%     | 9.72       | -1.08%         |
+------------+-----------------------+---------+------------+------------+----------------+

+------------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| Workload   | Query    | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Num Clients | Iters |
+------------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| TPCH(_300) | TPCH-Q5  | parquet / none / none | 48.88  | 46.93       |   +4.15%   |   0.14%    |   3.61%        | 1           | 3     |
| TPCH(_300) | TPCH-Q13 | parquet / none / none | 21.64  | 21.15       |   +2.29%   |   2.06%    |   1.84%        | 1           | 3     |
| TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.71   | 1.70        |   +1.12%   |   0.54%    |   2.51%        | 1           | 3     |
| TPCH(_300) | TPCH-Q18 | parquet / none / none | 33.15  | 32.79       |   +1.09%   |   0.13%    |   2.03%        | 1           | 3     |
| TPCH(_300) | TPCH-Q14 | parquet / none / none | 5.95   | 5.90        |   +0.82%   |   2.19%    |   0.49%        | 1           | 3     |
| TPCH(_300) | TPCH-Q1  | parquet / none / none | 13.99  | 13.90       |   +0.63%   |   0.25%    |   1.39%        | 1           | 3     |
| TPCH(_300) | TPCH-Q2  | parquet / none / none | 3.44   | 3.44        |   +0.00%   | * 20.29% * | * 20.76% *     | 1           | 3     |
| TPCH(_300) | TPCH-Q6  | parquet / none / none | 1.21   | 1.22        |   -0.01%   |   0.06%    |   0.06%        | 1           | 3     |
| TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.51   | 3.51        |   -0.11%   |   7.15%    |   7.30%        | 1           | 3     |
| TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.89   | 6.91        |   -0.21%   |   0.65%    |   0.55%        | 1           | 3     |
| TPCH(_300) | TPCH-Q4  | parquet / none / none | 4.78   | 4.80        |   -0.38%   |   0.06%    |   0.59%        | 1           | 3     |
| TPCH(_300) | TPCH-Q19 | parquet / none / none | 30.78  | 31.04       |   -0.83%   |   0.45%    |   1.03%        | 1           | 3     |
| TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.06   | 6.12        |   -1.02%   |   1.51%    |   2.12%        | 1           | 3     |
| TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.43   | 9.58        |   -1.54%   |   0.69%    |   3.30%        | 1           | 3     |
| TPCH(_300) | TPCH-Q21 | parquet / none / none | 93.41  | 95.18       |   -1.86%   |   0.08%    |   0.81%        | 1           | 3     |
| TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.40   | 3.47        |   -1.99%   |   0.72%    |   1.27%        | 1           | 3     |
| TPCH(_300) | TPCH-Q7  | parquet / none / none | 44.98  | 46.24       |   -2.71%   |   1.83%    |   1.27%        | 1           | 3     |
| TPCH(_300) | TPCH-Q3  | parquet / none / none | 28.06  | 29.11       |   -3.61%   |   1.62%    |   1.23%        | 1           | 3     |
| TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.15   | 3.28        |   -3.80%   |   0.96%    |   1.32%        | 1           | 3     |
| TPCH(_300) | TPCH-Q9  | parquet / none / none | 29.47  | 30.80       |   -4.30%   |   0.29%    |   0.34%        | 1           | 3     |
| TPCH(_300) | TPCH-Q17 | parquet / none / none | 4.37   | 4.62        |   -5.33%   |   0.63%    |   0.54%        | 1           | 3     |
| TPCH(_300) | TPCH-Q8  | parquet / none / none | 7.99   | 8.46        |   -5.53%   |   7.95%    |   1.11%        | 1           | 3     |
+------------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+

Here are the TPCDS performance comparison results between this change
and the previous commit on a 16 node cluster. I inspected the Q2 results
and concluded that the variability is unrelated to this change.

+--------------+-----------------------+---------+------------+------------+----------------+
| Workload     | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+--------------+-----------------------+---------+------------+------------+----------------+
| TPCDS(_1000) | parquet / none / none | 13.07   | +0.51%     | 4.27       | +1.83%         |
+--------------+-----------------------+---------+------------+------------+----------------+

+--------------+------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| Workload     | Query      | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Num Clients | Iters |
+--------------+------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| TPCDS(_1000) | TPCDS-Q2   | parquet / none / none | 8.36   | 4.25        | R +96.81%  | * 48.88% * |   0.42%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q8   | parquet / none / none | 1.59   | 1.35        |   +17.86%  | * 13.91% * |   4.01%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q73  | parquet / none / none | 1.81   | 1.71        |   +5.92%   |   5.53%    |   0.15%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q28  | parquet / none / none | 7.26   | 6.95        |   +4.47%   |   1.09%    |   1.11%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q46  | parquet / none / none | 2.36   | 2.30        |   +2.62%   |   1.45%    |   0.40%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q7   | parquet / none / none | 2.78   | 2.73        |   +1.98%   |   1.21%    |   2.23%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q55  | parquet / none / none | 1.05   | 1.03        |   +1.91%   |   1.16%    |   2.20%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q42  | parquet / none / none | 1.05   | 1.04        |   +1.71%   |   0.90%    |   2.63%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q19  | parquet / none / none | 1.67   | 1.65        |   +1.55%   |   1.12%    |   1.96%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q23  | parquet / none / none | 151.75 | 149.94      |   +1.20%   |   3.23%    |   1.83%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q64  | parquet / none / none | 40.25  | 39.79       |   +1.16%   |   0.43%    |   0.28%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q96  | parquet / none / none | 2.25   | 2.22        |   +1.05%   |   1.00%    |   0.11%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q53  | parquet / none / none | 1.60   | 1.58        |   +1.01%   |   1.28%    |   0.04%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q79  | parquet / none / none | 4.17   | 4.13        |   +0.94%   |   0.89%    |   0.06%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q59  | parquet / none / none | 5.74   | 5.71        |   +0.60%   |   1.22%    |   2.56%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q52  | parquet / none / none | 0.89   | 0.89        |   +0.14%   |   0.03%    |   0.63%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q88  | parquet / none / none | 7.10   | 7.12        |   -0.23%   |   0.43%    |   0.47%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q3   | parquet / none / none | 1.10   | 1.11        |   -0.40%   |   0.58%    |   0.36%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q98  | parquet / none / none | 2.30   | 2.31        |   -0.49%   |   3.58%    |   1.04%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q61  | parquet / none / none | 1.87   | 1.89        |   -1.08%   |   1.68%    |   0.14%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q27a | parquet / none / none | 2.93   | 2.96        |   -1.18%   |   1.74%    |   1.54%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q34  | parquet / none / none | 2.23   | 2.27        |   -1.73%   |   1.91%    |   1.32%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q63  | parquet / none / none | 1.56   | 1.60        |   -1.96%   |   1.91%    |   3.33%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q89  | parquet / none / none | 2.64   | 2.70        |   -2.20%   |   1.93%    |   1.88%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q47  | parquet / none / none | 30.41  | 31.17       |   -2.41%   |   1.09%    |   1.52%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q1   | parquet / none / none | 3.77   | 3.86        |   -2.46%   |   1.91%    |   0.61%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q6   | parquet / none / none | 61.67  | 63.34       |   -2.65%   |   3.77%    |   0.31%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q4   | parquet / none / none | 31.11  | 31.96       |   -2.66%   |   0.61%    |   0.77%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q43  | parquet / none / none | 4.10   | 4.22        |   -2.87%   |   1.40%    |   2.85%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q5   | parquet / none / none | 8.30   | 8.56        |   -3.13%   |   1.55%    |   0.47%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q27  | parquet / none / none | 2.28   | 2.35        |   -3.13%   |   1.17%    |   1.56%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q65  | parquet / none / none | 31.74  | 32.77       |   -3.15%   |   1.47%    |   1.11%        | 1           | 3     |
| TPCDS(_1000) | TPCDS-Q68  | parquet / none / none | 1.56   | 1.62        |   -3.58%   |   9.37%    | * 11.93% *     | 1           | 3     |
+--------------+------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+

(R) Regression: TPCDS(_1000) TPCDS-Q2 [parquet / none / none] (4.25s -> 8.36s [+96.81%])
+---------------------+------------+----------+----------+------------+------------+----------+----------+------------+--------+---------+-----------+
| Operator            | % of Query | Avg      | Base Avg | Delta(Avg) | StdDev(%)  | Max      | Base Max | Delta(Max) | #Hosts | #Rows   | Est #Rows |
+---------------------+------------+----------+----------+------------+------------+----------+----------+------------+--------+---------+-----------+
| 27:MERGING-EXCHANGE | 22.48%     | 6.97s    | 2.85s    | +144.40%   | * 58.44% * | 11.05s   | 2.86s    | +286.33%   | 1      | 2.51K   | 2.56K     |
| 26:EXCHANGE         | 7.65%      | 2.37s    | 2.43s    | -2.16%     |   1.82%    | 2.46s    | 2.50s    | -1.65%     | 14     | 365     | 2.56K     |
| 23:EXCHANGE         | 8.58%      | 2.66s    | 2.70s    | -1.46%     |   1.67%    | 2.74s    | 2.78s    | -1.47%     | 14     | 516     | 10.64K    |
| 13:AGGREGATE        | 4.21%      | 1.31s    | 1.30s    | +0.65%     |   0.06%    | 1.47s    | 1.43s    | +2.38%     | 14     | 516     | 10.64K    |
| 12:HASH JOIN        | 2.89%      | 896.20ms | 885.79ms | +1.17%     |   1.43%    | 1.06s    | 1.01s    | +4.77%     | 14     | 433.27M | 2.16B     |
| 06:SCAN HDFS        | 2.83%      | 877.34ms | 886.93ms | -1.08%     |   1.23%    | 888.16ms | 906.88ms | -2.06%     | 1      | 365     | 373       |
| 19:EXCHANGE         | 23.20%     | 7.20s    | 3.12s    | +130.58%   | * 56.73% * | 11.33s   | 3.17s    | +256.92%   | 14     | 520     | 10.64K    |
| 05:AGGREGATE        | 12.06%     | 3.74s    | 1.34s    | +178.49%   | * 64.53% * | 6.33s    | 1.53s    | +314.84%   | 14     | 520     | 10.64K    |
| 04:HASH JOIN        | 7.71%      | 2.39s    | 956.81ms | +149.90%   | * 60.36% * | 4.04s    | 1.13s    | +256.75%   | 14     | 442.29M | 2.16B     |
| 03:SCAN HDFS        | 2.83%      | 878.97ms | 894.11ms | -1.69%     |   1.34%    | 890.78ms | 910.22ms | -2.14%     | 1      | 371     | 73.05K    |
+---------------------+------------+----------+----------+------------+------------+----------+----------+------------+--------+---------+-----------+

Change-Id: I626456b6afa9101eeeeffd5cda10c4096d63d7f9
Reviewed-on: http://gerrit.cloudera.org:8080/8758
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/be0f4f9d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/be0f4f9d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/be0f4f9d

Branch: refs/heads/master
Commit: be0f4f9d21ac3c34f6ebf214e26686a008b57e24
Parents: c2d27ca
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Nov 28 15:08:29 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jan 24 02:41:31 2018 +0000

----------------------------------------------------------------------
 be/src/common/atomic.h                      |  17 +++
 be/src/runtime/coordinator-backend-state.cc |  46 ++++++++
 be/src/runtime/coordinator-backend-state.h  |  26 ++++-
 be/src/runtime/coordinator.cc               |  29 +++--
 be/src/runtime/coordinator.h                |   4 +
 be/src/runtime/fragment-instance-state.cc   | 130 ++++++++++++++++++++---
 be/src/runtime/fragment-instance-state.h    |  54 +++++++++-
 be/src/runtime/query-state.cc               |   2 +
 be/src/runtime/query-state.h                |   7 +-
 be/src/service/impala-http-handler.cc       |  22 ++++
 be/src/service/impala-http-handler.h        |   5 +
 be/src/util/runtime-profile-counters.h      |  69 ++++++++----
 be/src/util/runtime-profile.cc              |  21 +++-
 be/src/util/stopwatch.h                     |  30 +++---
 common/thrift/ImpalaInternalService.thrift  |  21 +++-
 tests/query_test/test_observability.py      |  13 +++
 tests/webserver/test_web_pages.py           |  49 ++++++---
 www/query_detail_tabs.tmpl                  |   1 +
 www/query_finstances.tmpl                   | 129 ++++++++++++++++++++++
 19 files changed, 594 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/common/atomic.h
----------------------------------------------------------------------
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 791f3b2..3925137 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -132,6 +132,23 @@ class AtomicPtr {
   internal::AtomicInt<intptr_t> ptr_;
 };
 
+/// Atomic enum. Operations have the same semantics as AtomicInt.
+template<typename T>
+class AtomicEnum {
+  static_assert(std::is_enum<T>::value, "Type must be enum");
+  static_assert(sizeof(typename std::underlying_type<T>::type) <= sizeof(int32_t),
+      "Underlying enum type must fit into 4 bytes");
+
+ public:
+  /// Atomic load with "acquire" memory-ordering semantic.
+  ALWAYS_INLINE T Load() const { return static_cast<T>(enum_.Load()); }
+
+  /// Atomic store with "release" memory-ordering semantic.
+  ALWAYS_INLINE void Store(T val) { enum_.Store(val); }
+
+ private:
+  internal::AtomicInt<int32_t> enum_;
+};
 
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 9d89086..914a3e4 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -453,6 +453,7 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
 void Coordinator::BackendState::InstanceStats::Update(
     const TFragmentInstanceExecStatus& exec_status,
     ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
+  last_report_time_ms_ = MonotonicMillis();
   if (exec_status.done) stopwatch_.Stop();
   profile_->Update(exec_status.profile);
   if (!profile_created_) {
@@ -496,6 +497,32 @@ void Coordinator::BackendState::InstanceStats::Update(
   int64_t delta = total - total_ranges_complete_;
   total_ranges_complete_ = total;
   scan_range_progress->Update(delta);
+
+  // extract the current execution state of this instance
+  current_state_ = exec_status.current_state;
+}
+
+void Coordinator::BackendState::InstanceStats::ToJson(Value* value, Document* document) {
+  Value instance_id_val(PrintId(exec_params_.instance_id).c_str(),
+      document->GetAllocator());
+  value->AddMember("instance_id", instance_id_val, document->GetAllocator());
+
+  // We send 'done' explicitly so we don't have to infer it by comparison with a string
+  // constant in the debug page JS code.
+  value->AddMember("done", done_, document->GetAllocator());
+
+  Value state_val(FragmentInstanceState::ExecStateToString(current_state_).c_str(),
+      document->GetAllocator());
+  value->AddMember("current_state", state_val, document->GetAllocator());
+
+  Value fragment_name_val(exec_params_.fragment().display_name.c_str(),
+      document->GetAllocator());
+  value->AddMember("fragment_name", fragment_name_val, document->GetAllocator());
+
+  value->AddMember("first_status_update_received", last_report_time_ms_ > 0,
+      document->GetAllocator());
+  value->AddMember("time_since_last_heard_from", MonotonicMillis() - last_report_time_ms_,
+      document->GetAllocator());
 }
 
 Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
@@ -583,3 +610,22 @@ void Coordinator::BackendState::ToJson(Value* value, Document* document) {
   value->AddMember(
       "num_remaining_instances", num_remaining_instances_, document->GetAllocator());
 }
+
+void Coordinator::BackendState::InstanceStatsToJson(Value* value, Document* document) {
+  Value instance_stats(kArrayType);
+  {
+    lock_guard<mutex> l(lock_);
+    for (const auto& elem : instance_stats_map_) {
+      Value val(kObjectType);
+      elem.second->ToJson(&val, document);
+      instance_stats.PushBack(val, document->GetAllocator());
+    }
+    DCHECK_EQ(instance_stats.Size(), fragments_.size());
+  }
+  value->AddMember("instance_stats", instance_stats, document->GetAllocator());
+
+  // impalad_address is not protected by lock_. The lifetime of the backend state is
+  // protected by Coordinator::lock_.
+  Value val(TNetworkAddressToString(impalad_address()).c_str(), document->GetAllocator());
+  value->AddMember("host", val, document->GetAllocator());
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 73acef9..0973ca3 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -124,6 +124,10 @@ class Coordinator::BackendState {
   /// number of instances, peak memory consumption, host and status amongst others.
   void ToJson(rapidjson::Value* value, rapidjson::Document* doc);
 
+  /// Serializes the InstanceStats of all instances of this backend state to JSON by
+  /// adding members to 'value', including the remote host name.
+  void InstanceStatsToJson(rapidjson::Value* value, rapidjson::Document* doc);
+
  private:
   /// Execution stats for a single fragment instance.
   /// Not thread-safe.
@@ -132,9 +136,9 @@ class Coordinator::BackendState {
     InstanceStats(const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
         ObjectPool* obj_pool);
 
-    /// Update 'this' with exec_status, the fragment instances' TExecStats in
-    /// exec_summary, and 'progress_updater' with the number of
-    /// newly completed scan ranges. Also updates the instance's avg profile.
+    /// Updates 'this' with exec_status, the fragment instances' TExecStats in
+    /// exec_summary, and 'progress_updater' with the number of newly completed scan
+    /// ranges. Also updates the instance's avg profile.
     void Update(const TFragmentInstanceExecStatus& exec_status,
         ExecSummary* exec_summary, ProgressUpdater* scan_range_progress);
 
@@ -142,12 +146,20 @@ class Coordinator::BackendState {
       return exec_params_.per_fragment_instance_idx;
     }
 
+    /// Serializes instance stats to JSON by adding members to 'value', including its
+    /// instance id, plan fragment name, and the last event that was recorded during
+    /// execution of the instance.
+    void ToJson(rapidjson::Value* value, rapidjson::Document* doc);
+
    private:
     friend class BackendState;
 
     /// query lifetime
     const FInstanceExecParams& exec_params_;
 
+    /// Set in Update(). Uses MonotonicMillis().
+    int64_t last_report_time_ms_ = 0;
+
     /// owned by coordinator object pool provided in the c'tor, created in Update()
     RuntimeProfile* profile_;
 
@@ -172,9 +184,13 @@ class Coordinator::BackendState {
     std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
 
     /// PER_HOST_PEAK_MEM_COUNTER
-    RuntimeProfile::Counter* peak_mem_counter_;
+    RuntimeProfile::Counter* peak_mem_counter_ = nullptr;
+
+    /// The current state of this fragment instance's execution. This gets serialized in
+    /// ToJson() and is displayed in the debug webpages.
+    TFInstanceExecState::type current_state_ = TFInstanceExecState::WAITING_FOR_EXEC;
 
-    /// Extract scan_ranges_complete_counters_ and peak_mem_counter_ from profile_.
+    /// Extracts scan_ranges_complete_counters_ and peak_mem_counter_ from profile_.
     void InitCounters();
   };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 85ff810..7973775 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -459,8 +459,8 @@ Status Coordinator::GetStatus() {
   return query_status_;
 }
 
-  Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
-     bool is_fragment_failure, const TUniqueId& instance_id) {
+Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
+    bool is_fragment_failure, const TUniqueId& instance_id) {
   {
     lock_guard<mutex> l(lock_);
 
@@ -1258,13 +1258,28 @@ MemTracker* Coordinator::query_mem_tracker() const {
 }
 
 void Coordinator::BackendsToJson(Document* doc) {
-  lock_guard<mutex> l(lock_);
   Value states(kArrayType);
-  for (BackendState* state : backend_states_) {
-    Value val(kObjectType);
-    state->ToJson(&val, doc);
-    states.PushBack(val, doc->GetAllocator());
+  {
+    lock_guard<mutex> l(lock_);
+    for (BackendState* state : backend_states_) {
+      Value val(kObjectType);
+      state->ToJson(&val, doc);
+      states.PushBack(val, doc->GetAllocator());
+    }
   }
   doc->AddMember("backend_states", states, doc->GetAllocator());
 }
+
+void Coordinator::FInstanceStatsToJson(Document* doc) {
+  Value states(kArrayType);
+  {
+    lock_guard<mutex> l(lock_);
+    for (BackendState* state : backend_states_) {
+      Value val(kObjectType);
+      state->InstanceStatsToJson(&val, doc);
+      states.PushBack(val, doc->GetAllocator());
+    }
+  }
+  doc->AddMember("backend_instances", states, doc->GetAllocator());
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index e7ddee9..d630b9a 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -183,6 +183,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// 'backend_states'.
   void BackendsToJson(rapidjson::Document* document);
 
+  /// Adds to 'document' a serialized array of all backend names and stats of all fragment
+  /// instances running on each backend in a member named 'backend_instances'.
+  void FInstanceStatsToJson(rapidjson::Document* document);
+
  private:
   class BackendState;
   struct FilterTarget;

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 5c9deb8..16b4a7e 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -61,7 +61,6 @@ static const string OPEN_TIMER_NAME = "OpenTime";
 static const string PREPARE_TIMER_NAME = "PrepareTime";
 static const string EXEC_TIMER_NAME = "ExecTime";
 
-
 FragmentInstanceState::FragmentInstanceState(
     QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
     const TPlanFragmentInstanceCtx& instance_ctx)
@@ -91,6 +90,7 @@ Status FragmentInstanceState::Exec() {
   }
 
 done:
+  UpdateState(StateEvent::EXEC_END);
   // call this before Close() to make sure the thread token got released
   Finalize(status);
   Close();
@@ -114,7 +114,6 @@ void FragmentInstanceState::Cancel() {
 
 Status FragmentInstanceState::Prepare() {
   DCHECK(!prepared_promise_.IsSet());
-
   VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_);
 
   // Do not call RETURN_IF_ERROR or explicitly return before this line,
@@ -129,11 +128,17 @@ Status FragmentInstanceState::Prepare() {
   profile()->AddChild(timings_profile_);
   SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
 
+  // Events that are tracked in a separate timeline for each fragment instance, relative
+  // to the startup of the query state.
+  event_sequence_ =
+      profile()->AddEventSequence("Fragment Instance Lifecycle Event Timeline");
+  event_sequence_->Start(query_state_->fragment_events_start_time());
+  UpdateState(StateEvent::PREPARE_START);
+
   runtime_state_->InitFilterBank();
 
   // Reserve one main thread from the pool
   runtime_state_->resource_pool()->AcquireThreadToken();
-
   avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
       bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
           runtime_state_->resource_pool()));
@@ -201,15 +206,6 @@ Status FragmentInstanceState::Prepare() {
     ReleaseThreadToken();
   }
 
-  if (runtime_state_->ShouldCodegen()) {
-    RETURN_IF_ERROR(runtime_state_->CreateCodegen());
-    exec_tree_->Codegen(runtime_state_);
-    // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
-    // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
-    // the error status for now.
-    RETURN_IF_ERROR(runtime_state_->CodegenScalarFns());
-  }
-
   // set up profile counters
   profile()->AddChild(exec_tree_->runtime_profile());
   rows_produced_counter_ =
@@ -247,14 +243,22 @@ Status FragmentInstanceState::Open() {
   SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
 
-  // codegen prior to exec_tree_->Open()
   if (runtime_state_->ShouldCodegen()) {
+    UpdateState(StateEvent::CODEGEN_START);
+    RETURN_IF_ERROR(runtime_state_->CreateCodegen());
+    exec_tree_->Codegen(runtime_state_);
+    // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
+    // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
+    // the error status for now.
+    RETURN_IF_ERROR(runtime_state_->CodegenScalarFns());
+
     LlvmCodeGen* codegen = runtime_state_->codegen();
     DCHECK(codegen != nullptr);
     RETURN_IF_ERROR(codegen->FinalizeModule());
   }
 
   {
+    UpdateState(StateEvent::OPEN_START);
     SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME));
     RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
   }
@@ -266,6 +270,7 @@ Status FragmentInstanceState::ExecInternal() {
       ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
   bool exec_tree_complete = false;
+  UpdateState(StateEvent::WAITING_FOR_FIRST_BATCH);
   do {
     Status status;
     row_batch_->Reset();
@@ -274,11 +279,14 @@ Status FragmentInstanceState::ExecInternal() {
       RETURN_IF_ERROR(
           exec_tree_->GetNext(runtime_state_, row_batch_.get(), &exec_tree_complete));
     }
+    UpdateState(StateEvent::BATCH_PRODUCED);
     if (VLOG_ROW_IS_ON) row_batch_->VLogRows("FragmentInstanceState::ExecInternal()");
     COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
     RETURN_IF_ERROR(sink_->Send(runtime_state_, row_batch_.get()));
+    UpdateState(StateEvent::BATCH_SENT);
   } while (!exec_tree_complete);
 
+  UpdateState(StateEvent::LAST_BATCH_SENT);
   // Flush the sink *before* stopping the report thread. Flush may need to add some
   // important information to the last report that gets sent. (e.g. table sinks record the
   // files they have written to in this method)
@@ -374,6 +382,79 @@ void FragmentInstanceState::SendReport(bool done, const Status& status) {
   query_state_->ReportExecStatus(done, status, this);
 }
 
+void FragmentInstanceState::UpdateState(const StateEvent event)
+{
+  TFInstanceExecState::type current_state = current_state_.Load();
+  TFInstanceExecState::type next_state = current_state;
+  switch (event) {
+    case StateEvent::PREPARE_START:
+      DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_EXEC);
+      next_state = TFInstanceExecState::WAITING_FOR_PREPARE;
+      break;
+
+    case StateEvent::CODEGEN_START:
+      DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_PREPARE);
+      event_sequence_->MarkEvent("Prepare Finished");
+      next_state = TFInstanceExecState::WAITING_FOR_CODEGEN;
+      break;
+
+    case StateEvent::OPEN_START:
+      if (current_state == TFInstanceExecState::WAITING_FOR_PREPARE) {
+        event_sequence_->MarkEvent("Prepare Finished");
+      } else {
+        DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_CODEGEN);
+      }
+      next_state = TFInstanceExecState::WAITING_FOR_OPEN;
+      break;
+
+    case StateEvent::WAITING_FOR_FIRST_BATCH:
+      DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_OPEN);
+      next_state = TFInstanceExecState::WAITING_FOR_FIRST_BATCH;
+      break;
+
+    case StateEvent::BATCH_PRODUCED:
+      if (UNLIKELY(current_state == TFInstanceExecState::WAITING_FOR_FIRST_BATCH)) {
+        event_sequence_->MarkEvent("First Batch Produced");
+        next_state = TFInstanceExecState::FIRST_BATCH_PRODUCED;
+      } else {
+        DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+      }
+      break;
+
+    case StateEvent::BATCH_SENT:
+      if (UNLIKELY(current_state == TFInstanceExecState::FIRST_BATCH_PRODUCED)) {
+        event_sequence_->MarkEvent("First Batch Sent");
+        next_state = TFInstanceExecState::PRODUCING_DATA;
+      } else {
+        DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+      }
+      break;
+
+    case StateEvent::LAST_BATCH_SENT:
+      if (UNLIKELY(current_state == TFInstanceExecState::WAITING_FOR_OPEN)) {
+        event_sequence_->MarkEvent("Open Finished");
+      } else {
+        DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+      }
+      next_state = TFInstanceExecState::LAST_BATCH_SENT;
+      break;
+
+    case StateEvent::EXEC_END:
+      // Allow abort in all states to make error handling easier.
+      event_sequence_->MarkEvent("ExecInternal Finished");
+      next_state = TFInstanceExecState::FINISHED;
+      break;
+
+    default:
+      DCHECK(false) << "Unexpected Event: " << static_cast<int>(event);
+      break;
+  }
+  // current_state_ is an AtomicEnum to add memory barriers for concurrent reads by the
+  // profile reporting thread. This method is the only one updating it and is not
+  // meant to be thread safe.
+  if (next_state != current_state) current_state_.Store(next_state);
+}
+
 void FragmentInstanceState::StopReportThread() {
   if (!report_thread_active_) return;
   {
@@ -426,6 +507,29 @@ void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
   runtime_state_->filter_bank()->PublishGlobalFilter(params);
 }
 
+string FragmentInstanceState::ExecStateToString(const TFInstanceExecState::type state) {
+  // Labels to send to the debug webpages to display the current state to the user.
+  static const string finstance_state_labels[] = {
+      "Waiting for Exec",         // WAITING_FOR_EXEC
+      "Waiting for Codegen",      // WAITING_FOR_CODEGEN
+      "Waiting for Prepare",      // WAITING_FOR_PREPARE
+      "Waiting for First Batch",  // WAITING_FOR_OPEN
+      "Waiting for First Batch",  // WAITING_FOR_FIRST_BATCH
+      "First batch produced",     // FIRST_BATCH_PRODUCED
+      "Producing Data",           // PRODUCING_DATA
+      "Last batch sent",          // LAST_BATCH_SENT
+      "Finished"                  // FINISHED
+  };
+  /// Make sure we have a label for every possible state.
+  static_assert(
+      sizeof(finstance_state_labels) / sizeof(char*) == TFInstanceExecState::FINISHED + 1,
+      "");
+
+  DCHECK_LT(state, sizeof(finstance_state_labels) / sizeof(char*))
+      << "Unknown instance state";
+  return finstance_state_labels[state];
+}
+
 const TQueryCtx& FragmentInstanceState::query_ctx() const {
   return query_state_->query_ctx();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 4e832f6..292b93c 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -23,6 +23,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
+#include "common/atomic.h"
 #include "common/status.h"
 #include "util/promise.h"
 
@@ -54,12 +55,14 @@ class RuntimeState;
 /// for this fragment instance and closes all data streams.
 ///
 /// The FIS makes an aggregated profile for the entire fragment available, which
-/// includes profile information for the plan itself as well as the output sink.
+/// includes profile information for the plan itself as well as the output sink. It also
+/// contains a timeline of events of the fragment instance.
 /// The FIS periodically makes a ReportExecStatus RPC to the coordinator to report the
-/// execution status and profile. The frequency of those reports is controlled by the flag
-/// status_report_interval; setting that flag to 0 disables periodic reporting altogether
-/// Regardless of the value of that flag, a report is sent at least once at the end of
-/// execution with an overall status and profile (and 'done' indicator).
+/// execution status, the current state of the execution, and the instance profile. The
+/// frequency of those reports is controlled by the flag status_report_interval; setting
+/// that flag to 0 disables periodic reporting altogether Regardless of the value of that
+/// flag, a report is sent at least once at the end of execution with an overall status
+/// and profile (and 'done' indicator).
 /// The FIS will send at least one final status report. If execution ended with an error,
 /// that error status will be part of the final report (it will not be overridden by
 /// the resulting cancellation).
@@ -105,6 +108,9 @@ class FragmentInstanceState {
   /// the Prepare phase. May be nullptr.
   PlanRootSink* root_sink() { return root_sink_; }
 
+  /// Returns a string description of 'current_state_'.
+  static string ExecStateToString(const TFInstanceExecState::type state);
+
   /// Name of the counter that is tracking per query, per host peak mem usage.
   /// TODO: this doesn't look like it belongs here
   static const std::string PER_HOST_PEAK_MEM_COUNTER;
@@ -117,6 +123,7 @@ class FragmentInstanceState {
   const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; }
   const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
+  TFInstanceExecState::type current_state() const { return current_state_.Load(); }
   const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
   ObjectPool* obj_pool();
 
@@ -155,6 +162,37 @@ class FragmentInstanceState {
   /// Lives in obj_pool().
   RuntimeProfile* timings_profile_ = nullptr;
 
+  /// Event sequence tracking the completion of various stages of this fragment instance.
+  /// Updated in UpdateState().
+  RuntimeProfile::EventSequence* event_sequence_ = nullptr;
+
+  /// Events that change the current state of this instance's execution, which is kept in
+  /// 'current_state_'. Events are issued throughout the execution by calling
+  /// UpdateState(), which implements a state machine. See the implementation of
+  /// UpdateState() for valid state transitions.
+  enum class StateEvent {
+    /// Indicates the start of execution.
+    PREPARE_START,
+    /// Indicates that codegen will get called. Omitted if not doing codegen.
+    CODEGEN_START,
+    /// Indicates the call to Open().
+    OPEN_START,
+    /// Indicates waiting for the first batch to arrive.
+    WAITING_FOR_FIRST_BATCH,
+    /// Indicates that a new batch was produced by this instance.
+    BATCH_PRODUCED,
+    /// Indicates that a batch has been sent.
+    BATCH_SENT,
+    /// Indicates that no new batches will be received.
+    LAST_BATCH_SENT,
+    /// Indicates the end of this instance's execution.
+    EXEC_END
+  };
+
+  /// The current state of this fragment instance's execution. Only updated by the
+  /// fragment instance thread in UpdateState() and read by the profile reporting threads.
+  AtomicEnum<TFInstanceExecState::type> current_state_;
+
   /// Output sink for rows sent to this fragment. Created in Prepare(), lives in
   /// obj_pool().
   DataSink* sink_ = nullptr;
@@ -232,6 +270,12 @@ class FragmentInstanceState {
   /// ReportProfileThread() thread will do periodically.
   void SendReport(bool done, const Status& status);
 
+  /// Handle the execution event 'event'. This implements a state machine and will update
+  /// the current execution state of this fragment instance. Also marks an event in
+  /// 'event_sequence_' for some states. Must not be called by multiple threads
+  /// concurrently.
+  void UpdateState(const StateEvent event);
+
   /// Called when execution is complete to finalize counters and send the final status
   /// report.  Must be called only once. Can handle partially-finished Prepare().
   void Finalize(const Status& status);

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 259cd34..5a11caf 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -227,6 +227,7 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
     instance_status.__set_fragment_instance_id(fis->instance_id());
     status.SetTStatus(&instance_status);
     instance_status.__set_done(done);
+    instance_status.__set_current_state(fis->current_state());
 
     DCHECK(fis->profile() != nullptr);
     fis->profile()->ToThrift(&instance_status.profile);
@@ -304,6 +305,7 @@ void QueryState::StartFInstances() {
   DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
   TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
   int fragment_ctx_idx = 0;
+  fragment_events_start_time_ = MonotonicStopWatch::Now();
   for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) {
     // determine corresponding TPlanFragmentCtx
     if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index f7b83a7..ae3bdd5 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -109,7 +109,7 @@ class QueryState {
   }
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
 
-  // the following getters are only valid after Prepare()
+  // the following getters are only valid after Init()
   ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
   InitialReservations* initial_reservations() const { return initial_reservations_; }
   TmpFileMgr::FileGroup* file_group() const { return file_group_; }
@@ -117,6 +117,7 @@ class QueryState {
 
   // the following getters are only valid after StartFInstances()
   const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
+  int64_t fragment_events_start_time() const { return fragment_events_start_time_; }
 
   /// Sets up state required for fragment execution: memory reservations, etc. Fails
   /// if resources could not be acquired. Acquires a resource refcount and returns it
@@ -243,6 +244,10 @@ class QueryState {
   /// "num-queries-spilled" metric.
   AtomicInt32 query_spilled_;
 
+  /// Records the point in time when fragment instances are started up. Set in
+  /// StartFInstances().
+  int64_t fragment_events_start_time_ = 0;
+
   /// Create QueryState w/ refcnt of 0.
   /// The query is associated with the resource pool query_ctx.request_pool or
   /// 'request_pool', if the former is not set (needed for tests).

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 33c5e73..b633f2a 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -104,6 +104,9 @@ void ImpalaHttpHandler::RegisterHandlers(Webserver* webserver) {
   webserver->RegisterUrlCallback("/query_backends", "query_backends.tmpl",
       MakeCallback(this, &ImpalaHttpHandler::QueryBackendsHandler), false);
 
+  webserver->RegisterUrlCallback("/query_finstances", "query_finstances.tmpl",
+      MakeCallback(this, &ImpalaHttpHandler::QueryFInstancesHandler), false);
+
   webserver->RegisterUrlCallback("/cancel_query", "common-pre.tmpl",
       MakeCallback(this, &ImpalaHttpHandler::CancelQueryHandler), false);
 
@@ -708,6 +711,25 @@ void ImpalaHttpHandler::QueryBackendsHandler(
   request_state->coord()->BackendsToJson(document);
 }
 
+void ImpalaHttpHandler::QueryFInstancesHandler(
+    const Webserver::ArgumentMap& args, Document* document) {
+  TUniqueId query_id;
+  Status status = ParseIdFromArguments(args, &query_id, "query_id");
+  Value query_id_val(PrintId(query_id).c_str(), document->GetAllocator());
+  document->AddMember("query_id", query_id_val, document->GetAllocator());
+  if (!status.ok()) {
+    // Redact the error message, it may contain part or all of the query.
+    Value json_error(RedactCopy(status.GetDetail()).c_str(), document->GetAllocator());
+    document->AddMember("error", json_error, document->GetAllocator());
+    return;
+  }
+
+  shared_ptr<ClientRequestState> request_state = server_->GetClientRequestState(query_id);
+  if (request_state.get() == nullptr || request_state->coord() == nullptr) return;
+
+  request_state->coord()->FInstanceStatsToJson(document);
+}
+
 void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include_summary,
     const Webserver::ArgumentMap& args, Document* document) {
   TUniqueId query_id;

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/service/impala-http-handler.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h
index 8ad84bd..f2492ff 100644
--- a/be/src/service/impala-http-handler.h
+++ b/be/src/service/impala-http-handler.h
@@ -96,6 +96,11 @@ class ImpalaHttpHandler {
   void QueryBackendsHandler(
       const Webserver::ArgumentMap& args, rapidjson::Document* document);
 
+  /// If 'args' contains a query id, serializes all fragment instance states for all
+  /// backends for that query to 'document'.
+  void QueryFInstancesHandler(
+      const Webserver::ArgumentMap& args, rapidjson::Document* document);
+
   /// Cancels an in-flight query and writes the result to 'contents'.
   void CancelQueryHandler(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 9227e66..62281f1 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -19,10 +19,11 @@
 #ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
 #define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
 
+#include <algorithm>
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
-#include <sys/time.h>
 #include <sys/resource.h>
+#include <sys/time.h>
 
 #include "common/atomic.h"
 #include "common/logging.h"
@@ -277,11 +278,10 @@ class RuntimeProfile::ThreadCounters {
   Counter* involuntary_context_switches_;
 };
 
-/// An EventSequence captures a sequence of events (each added by
-/// calling MarkEvent). Each event has a text label, and a time
-/// (measured relative to the moment Start() was called as t=0). It is
-/// useful for tracking the evolution of some serial process, such as
-/// the query lifecycle.
+/// An EventSequence captures a sequence of events (each added by calling MarkEvent()).
+/// Each event has a text label and a time (measured relative to the moment Start() was
+/// called as t=0, or to the parameter 'when' passed to Start(int64_t when)). It is useful
+/// for tracking the evolution of some serial process, such as the query lifecycle.
 class RuntimeProfile::EventSequence {
  public:
   EventSequence() { }
@@ -298,12 +298,20 @@ class RuntimeProfile::EventSequence {
   /// Starts the timer without resetting it.
   void Start() { sw_.Start(); }
 
+  /// Starts the timer. All events will be recorded as if the timer had been started at
+  /// 'start_time_ns', which must have been obtained by calling MonotonicStopWatch::Now().
+  void Start(int64_t start_time_ns) {
+    offset_ = MonotonicStopWatch::Now() - start_time_ns;
+    DCHECK_GE(offset_, 0);
+    sw_.Start();
+  }
+
   /// Stores an event in sequence with the given label and the current time
   /// (relative to the first time Start() was called) as the timestamp.
-  void MarkEvent(const std::string& label) {
-    Event event = make_pair(label, sw_.ElapsedTime());
+  void MarkEvent(std::string label) {
+    Event event = make_pair(move(label), sw_.ElapsedTime());
     boost::lock_guard<SpinLock> event_lock(lock_);
-    events_.push_back(event);
+    events_.emplace_back(move(event));
   }
 
   int64_t ElapsedTime() { return sw_.ElapsedTime(); }
@@ -311,34 +319,57 @@ class RuntimeProfile::EventSequence {
   /// An Event is a <label, timestamp> pair.
   typedef std::pair<std::string, int64_t> Event;
 
-  /// An EventList is a sequence of Events, in increasing timestamp order.
+  /// An EventList is a sequence of Events.
   typedef std::vector<Event> EventList;
 
-  /// Copies the member events_ into the supplied vector 'events'.
-  /// The supplied vector 'events' is cleared before this.
+  /// Returns a copy of 'events_' in the supplied vector 'events', sorted by their
+  /// timestamps. The supplied vector 'events' is cleared before this.
   void GetEvents(std::vector<Event>* events) {
     events->clear();
     boost::lock_guard<SpinLock> event_lock(lock_);
-    /// It's possible that concurrent events can be logged out of sequence.
-    /// So sort the events each time we are here.
+    /// It's possible that MarkEvent() logs concurrent events out of sequence so we sort
+    /// the events each time we are here.
+    SortEvents();
+    events->insert(events->end(), events_.begin(), events_.end());
+  }
+
+  /// Adds all events from the input parameters that are newer than the last member of
+  /// 'events_'. The caller must make sure that 'timestamps' is sorted.
+  void AddNewerEvents(
+      const std::vector<int64_t>& timestamps, const std::vector<std::string>& labels) {
+    DCHECK_EQ(timestamps.size(), labels.size());
+    DCHECK(std::is_sorted(timestamps.begin(), timestamps.end()));
+    boost::lock_guard<SpinLock> event_lock(lock_);
+    int64_t last_timestamp = events_.back().second;
+    for (int64_t i = 0; i < timestamps.size(); ++i) {
+      if (timestamps[i] <= last_timestamp) continue;
+      events_.push_back(make_pair(labels[i], timestamps[i]));
+    }
+  }
+
+  void ToThrift(TEventSequence* seq);
+
+ private:
+  /// Sorts events by their timestamp. Caller must hold lock_.
+  void SortEvents() {
     std::sort(events_.begin(), events_.end(),
         [](Event const &event1, Event const &event2) {
         return event1.second < event2.second;
       });
-    events->insert(events->end(), events_.begin(), events_.end());
   }
 
-  void ToThrift(TEventSequence* seq) const;
-
- private:
   /// Protect access to events_.
   SpinLock lock_;
 
-  /// Stored in increasing time order.
+  /// Sequence of events. Due to a race in MarkEvent() these are not necessarily ordered.
   EventList events_;
 
   /// Timer which allows events to be timestamped when they are recorded.
   MonotonicStopWatch sw_;
+
+  /// Constant offset that gets added to each event's timestamp. This allows to
+  /// synchronize events captured in multiple threads to a common starting point.
+  int64_t offset_ = 0;
 };
 
 typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index a05e55c..c057cac 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -291,7 +291,6 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
       if (it == time_series_counter_map_.end()) {
         time_series_counter_map_[c.name] =
             pool_->Add(new TimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
-        it = time_series_counter_map_.find(c.name);
       } else {
         it->second->samples_.SetSamples(c.period_ms, c.values);
       }
@@ -299,6 +298,20 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
   }
 
   {
+    lock_guard<SpinLock> l(event_sequence_lock_);
+    for (int i = 0; i < node.event_sequences.size(); ++i) {
+      const TEventSequence& seq = node.event_sequences[i];
+      EventSequenceMap::iterator it = event_sequence_map_.find(seq.name);
+      if (it == event_sequence_map_.end()) {
+        event_sequence_map_[seq.name] =
+            pool_->Add(new EventSequence(seq.timestamps, seq.labels));
+      } else {
+        it->second->AddNewerEvents(seq.timestamps, seq.labels);
+      }
+    }
+  }
+
+  {
     lock_guard<SpinLock> l(summary_stats_map_lock_);
     for (int i = 0; i < node.summary_stats_counters.size(); ++i) {
       const TSummaryStatsCounter& c = node.summary_stats_counters[i];
@@ -1058,7 +1071,11 @@ string RuntimeProfile::TimeSeriesCounter::DebugString() const {
   return ss.str();
 }
 
-void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) const {
+void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) {
+  lock_guard<SpinLock> l(lock_);
+  /// It's possible that concurrent events can be logged out of sequence so we sort the
+  /// events before serializing them.
+  SortEvents();
   for (const EventSequence::Event& ev: events_) {
     seq->labels.push_back(ev.first);
     seq->timestamps.push_back(ev.second);

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/be/src/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index c1f85aa..443b655 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -145,20 +145,8 @@ class MonotonicStopWatch {
     return total_time_;
   }
 
- private:
-  /// Start epoch value.
-  uint64_t start_;
-
-  /// Total elapsed time in nanoseconds.
-  uint64_t total_time_;
-
-  /// Upper bound of the running time as a epoch value. If the value is larger than 0,
-  /// the stopwatch interprets this as a time ceiling is set.
-  uint64_t time_ceiling_;
-
-  /// True if stopwatch is running.
-  bool running_;
-
+  /// Returns an representation of the current time in nanoseconds. It can be used to
+  /// measure time durations by repeatedly calling this function and comparing the result.
   /// While this function returns nanoseconds, its resolution may be as large as
   /// milliseconds, depending on OsInfo::fast_clock().
   static inline int64_t Now() {
@@ -174,6 +162,20 @@ class MonotonicStopWatch {
 #endif
   }
 
+ private:
+  /// Start epoch value.
+  uint64_t start_;
+
+  /// Total elapsed time in nanoseconds.
+  uint64_t total_time_;
+
+  /// Upper bound of the running time as a epoch value. If the value is larger than 0,
+  /// the stopwatch interprets this as a time ceiling is set.
+  uint64_t time_ceiling_;
+
+  /// True if stopwatch is running.
+  bool running_;
+
   /// Returns the time since start.
   /// If time_ceiling_ is set, the stop watch won't run pass the ceiling.
   uint64_t RunningTime() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index a584ce1..121c551 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -50,7 +50,7 @@ enum TParquetFallbackSchemaResolution {
   NAME
 }
 
-// The order of the enum values needs to be kepy in sync with
+// The order of the enum values needs to be kept in sync with
 // ParquetMetadataUtils::ORDERED_ARRAY_ENCODINGS in parquet-metadata-utils.cc.
 enum TParquetArrayResolution {
   THREE_LEVEL,
@@ -606,6 +606,21 @@ struct TErrorLogEntry {
   2: list<string> messages
 }
 
+// Represents the states that a fragment instance goes through during its execution. The
+// current state gets sent back to the coordinator and will be presented to users through
+// the debug webpages.
+enum TFInstanceExecState {
+  WAITING_FOR_EXEC,
+  WAITING_FOR_CODEGEN,
+  WAITING_FOR_PREPARE,
+  WAITING_FOR_OPEN,
+  WAITING_FOR_FIRST_BATCH,
+  FIRST_BATCH_PRODUCED,
+  PRODUCING_DATA,
+  LAST_BATCH_SENT,
+  FINISHED
+}
+
 struct TFragmentInstanceExecStatus {
   // required in V1
   1: optional Types.TUniqueId fragment_instance_id
@@ -621,6 +636,10 @@ struct TFragmentInstanceExecStatus {
   // cumulative profile
   // required in V1
   4: optional RuntimeProfile.TRuntimeProfileTree profile
+
+  // The current state of this fragment instance's execution.
+  // required in V1
+  5: optional TFInstanceExecState current_state
 }
 
 struct TReportExecStatusParams {

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index e8599b5..85fc4f1 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -181,3 +181,16 @@ class TestObservability(ImpalaTestSuite):
     dbg_str = "Debug thrift profile for query {0} not available in {1} seconds".format(
       query_id, MAX_WAIT)
     assert False, dbg_str
+
+  def test_query_profile_contains_instance_events(self, unique_database):
+    """Test that /query_profile_encoded contains an event timeline for fragment
+    instances, even when there are errors."""
+    events = ["Fragment Instance Lifecycle Event Timeline",
+              "Prepare Finished",
+              "First Batch Produced",
+              "First Batch Sent",
+              "ExecInternal Finished"]
+    query = "select count(*) from functional.alltypes"
+    runtime_profile = self.execute_query(query).runtime_profile
+    for event in events:
+      assert event in runtime_profile

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 54163dc..8dd17a4 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -31,6 +31,7 @@ class TestWebPage(ImpalaTestSuite):
   CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object"
   TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
   QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
+  QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances"
   THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
   # log4j changes do not apply to the statestore since it doesn't
   # have an embedded JVM. So we make two sets of ports to test the
@@ -173,7 +174,21 @@ class TestWebPage(ImpalaTestSuite):
     self.get_and_check_status(self.TABLE_METRICS_URL +
       "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT)
 
-  def test_query_details(self, unique_database):
+  def __run_query_and_get_debug_page(self, query, page_url):
+    """Runs a query to obtain the content of the debug page pointed to by page_url, then
+    cancels the query."""
+    query_handle =  self.client.execute_async(query)
+    response_json = ""
+    try:
+      response = self.get_and_check_status(
+        page_url + "?query_id=%s&json" % query_handle.get_handle().id,
+        ports_to_test=[25000])
+      response_json = json.loads(response)
+    finally:
+      self.client.cancel(query_handle)
+    return response_json
+
+  def test_backend_states(self, unique_database):
     """Test that /query_backends returns the list of backend states for DML or queries;
     nothing for DDL statements"""
     CROSS_JOIN = ("select count(*) from functional.alltypes a "
@@ -181,20 +196,27 @@ class TestWebPage(ImpalaTestSuite):
     for q in [CROSS_JOIN,
               "CREATE TABLE {0}.foo AS {1}".format(unique_database, CROSS_JOIN),
               "DESCRIBE functional.alltypes"]:
-      query_handle =  self.client.execute_async(q)
-      try:
-        response = self.get_and_check_status(
-          self.QUERY_BACKENDS_URL + "?query_id=%s&json" % query_handle.get_handle().id,
-          ports_to_test=[25000])
+      response_json = self.__run_query_and_get_debug_page(q, self.QUERY_BACKENDS_URL)
 
-        response_json = json.loads(response)
+      if "DESCRIBE" not in q:
+        assert len(response_json['backend_states']) > 0
+      else:
+        assert 'backend_states' not in response_json
 
-        if "DESCRIBE" not in q:
-          assert len(response_json['backend_states']) > 0
-        else:
-          assert 'backend_states' not in response_json
-      finally:
-        self.client.cancel(query_handle)
+  def test_backend_instances(self, unique_database):
+    """Test that /query_finstances returns the list of fragment instances for DML or
+    queries; nothing for DDL statements"""
+    CROSS_JOIN = ("select count(*) from functional.alltypes a "
+                  "CROSS JOIN functional.alltypes b CROSS JOIN functional.alltypes c")
+    for q in [CROSS_JOIN,
+              "CREATE TABLE {0}.foo AS {1}".format(unique_database, CROSS_JOIN),
+              "DESCRIBE functional.alltypes"]:
+      response_json = self.__run_query_and_get_debug_page(q, self.QUERY_FINSTANCES_URL)
+
+      if "DESCRIBE" not in q:
+        assert len(response_json['backend_instances']) > 0
+      else:
+        assert 'backend_instances' not in response_json
 
   def test_io_mgr_threads(self):
     """Test that IoMgr threads have readable names. This test assumed that all systems we
@@ -207,4 +229,3 @@ class TestWebPage(ImpalaTestSuite):
     for pattern in expected_name_patterns:
       assert any(pattern in t for t in thread_names), \
            "Could not find thread matching '%s'" % pattern
-

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/www/query_detail_tabs.tmpl
----------------------------------------------------------------------
diff --git a/www/query_detail_tabs.tmpl b/www/query_detail_tabs.tmpl
index 0318761..8e413a1 100644
--- a/www/query_detail_tabs.tmpl
+++ b/www/query_detail_tabs.tmpl
@@ -28,4 +28,5 @@ under the License.
   <li id="profile-tab" role="presentation"><a href="/query_profile?query_id={{query_id}}">Profile</a></li>
   <li id="memory-tab" role="presentation"><a href="/query_memory?query_id={{query_id}}">Memory</a></li>
   <li id="backends-tab" role="presentation"><a href="/query_backends?query_id={{query_id}}">Backends</a></li>
+  <li id="finstances-tab" role="presentation"><a href="/query_finstances?query_id={{query_id}}">Fragment Instances</a></li>
 </ul>

http://git-wip-us.apache.org/repos/asf/impala/blob/be0f4f9d/www/query_finstances.tmpl
----------------------------------------------------------------------
diff --git a/www/query_finstances.tmpl b/www/query_finstances.tmpl
new file mode 100644
index 0000000..5d8ba33
--- /dev/null
+++ b/www/query_finstances.tmpl
@@ -0,0 +1,129 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+{{> www/common-header.tmpl }}
+{{> www/query_detail_tabs.tmpl }}
+<br/>
+{{?backend_instances}}
+<div>
+  <label>
+    <input type="checkbox" checked="true" id="toggle" onClick="toggleRefresh()"/>
+    <span id="refresh_on">Auto-refresh on</span>
+  </label>  Last updated: <span id="last-updated"></span>
+</div>
+
+<br/>
+<table id="finstances" class='table table-hover table-bordered'>
+  <thead>
+    <tr>
+      <th>Host</th>
+      <th>Fragment<br/>Name</th>
+      <th>Instance ID</th>
+      <th>Current state</th>
+      <th>Done</th>
+      <th>Time since last report (ms)</th>
+    </tr>
+  </thead>
+  <tbody>
+
+  </tbody>
+</table>
+
+<script>
+document.getElementById("finstances-tab").className = "active";
+
+var intervalId = 0;
+var table = null;
+var refresh = function () {
+    table.ajax.reload();
+    document.getElementById("last-updated").textContent = new Date();
+};
+
+// Unpack Json backend_states by merging the backend host name into every instance stats
+// row. Also clears the last report timestamp field for instances that have not started or
+// have already finished execution.
+function unpackJson(json) {
+    var result = new Array();
+    if (typeof json.backend_instances === "undefined") {
+        // Table will be empty, remove it.
+        table.table().destroy(true);
+        $("#finstances").remove();
+        // Display completion message.
+        $("#query_finished_alert").css("visibility", "visible");
+        // Stop auto refresh
+        $("#toggle").prop("checked", false);
+        toggleRefresh();
+        return json;
+    }
+    for (var i = 0; i < json.backend_instances.length; ++i) {
+        var backend_state = json.backend_instances[i];
+        var instance_stats = backend_state.instance_stats;
+        for (var j = 0; j < instance_stats.length; ++j) {
+            var instance = instance_stats[j];
+            instance.host = backend_state.host;
+            if (instance.done) instance.time_since_last_heard_from = "";
+            if (!instance.first_status_update_received) {
+              instance.time_since_last_heard_from = "";
+            }
+            delete instance.first_status_update_received;
+            result.push(instance);
+        }
+    }
+    return result;
+}
+
+$(document).ready(function() {
+    table = $('#finstances').DataTable({
+        ajax: { url: "/query_finstances?query_id={{query_id}}&json",
+                dataSrc: unpackJson,
+              },
+        "columns": [ {data: 'host'},
+                     {data: 'fragment_name'},
+                     {data: 'instance_id'},
+                     {data: 'current_state'},
+                     {data: 'done'},
+                     {data: 'time_since_last_heard_from'}],
+        "order": [[ 0, "desc" ]],
+        "pageLength": 100
+    });
+    intervalId = setInterval(refresh, 1000);
+});
+
+function toggleRefresh() {
+    if (document.getElementById("toggle").checked == true) {
+        intervalId = setInterval(refresh, 1000);
+        document.getElementById("refresh_on").textContent = "Auto-refresh on";
+    } else {
+        clearInterval(intervalId);
+        document.getElementById("refresh_on").textContent = "Auto-refresh off";
+    }
+}
+
+</script>
+{{/backend_instances}}
+
+<div class="alert alert-info" role="alert" id="query_finished_alert"
+     style="visibility:hidden">
+Query <strong>{{query_id}}</strong> has completed, or has not started any backends, yet.
+</div>
+{{^backend_instances}}
+<script>$("#query_finished_alert").css("visibility", "visible");</script>
+{{/backend_instances}}
+
+{{> www/common-footer.tmpl }}