You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/06/12 04:08:23 UTC

[impala] branch master updated: IMPALA-9752: aggregate profile stats on executor

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 67b4764  IMPALA-9752: aggregate profile stats on executor
67b4764 is described below

commit 67b4764853ed08b1861bcc854adc047ae9420676
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Jun 8 09:40:29 2020 -0700

    IMPALA-9752: aggregate profile stats on executor
    
    Before this change the coordinator depended on getting the full
    fragment instance profiles from executors to pull out various
    things. This change removes that dependency by pulling out the
    information on the executor, and including it in the status
    report protobuf. This should slightly reduce the amount of work
    done on the coordinator, but more importantly, makes it easier
    to switch to sending aggregated profiles from executor to
    coordinator, because the coordinator no longer depends on
    receiving individual instance profiles.
    
    Per-host peak memory is included directly in the status report.
    
    Per-backend stats - where the per-backend total is needed -
    are aggregated on the executor with the result included in the
    status report. These counters are: BytesRead, ScanRangesComplete,
    TotalBytesSent, TotalThreads{User,Sys}Time.
    
    One subtlety to keep in mind that status reports don't include
    stats for instances where the final update was sent in a previous
    status report. So the executor needs to ensure that stats for
    finished fragment instances are included in updates. This is
    achieved by caching those values in FragmentInstanceState.
    
    The stats used in the exec summary were previously also plucked
    out of the profile on the coordinator. This change moves the work
    to the executor, and includes the per-node stats in the status
    report.
    
    I did a little cleanup of the profile counter declarations,
    making sure they were consistently inside the impala namespace
    in the files that I touched.
    
    Testing:
    Ran core tests.
    
    Manually checked exec summary, query profiles and backends
    page for a running query.
    
    Change-Id: Ia2aca354d803ce78a798a1a64f9f98353b813e4a
    Reviewed-on: http://gerrit.cloudera.org:8080/16050
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scan-node-base.cc          |   3 +-
 be/src/exec/kudu-scan-node-base.cc          |   4 +-
 be/src/runtime/coordinator-backend-state.cc | 125 ++++++++--------------------
 be/src/runtime/coordinator-backend-state.h  |  51 +++++-------
 be/src/runtime/coordinator.cc               |   4 +-
 be/src/runtime/fragment-instance-state.cc   |  52 +++++++++++-
 be/src/runtime/fragment-instance-state.h    |  22 +++++
 be/src/runtime/query-state.cc               |  41 +++++++--
 common/protobuf/control_service.proto       |  45 +++++++++-
 9 files changed, 213 insertions(+), 134 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b536fad..1ef9207 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -68,10 +68,10 @@ DEFINE_bool(always_use_data_cache, false, "(Advanced) Always uses the IO data ca
     "IO data cache is only used if the data is expected to be remote. Used by tests.");
 
 namespace filesystem = boost::filesystem;
-using namespace impala;
 using namespace impala::io;
 using namespace strings;
 
+namespace impala {
 PROFILE_DEFINE_TIMER(TotalRawHdfsReadTime, STABLE_LOW, "Aggregate wall clock time"
     " across all Disk I/O threads in HDFS read operations.");
 PROFILE_DEFINE_TIMER(TotalRawHdfsOpenFileTime, STABLE_LOW, "Aggregate wall clock time"
@@ -1252,3 +1252,4 @@ void ScanRangeSharedState::AddCancellationHook(RuntimeState* state) {
   DCHECK(use_mt_scan_node_) << "Should only be called by MT scan nodes";
   state->AddCancellationCV(&scan_range_submission_lock_, &range_submission_cv_);
 }
+}
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index a047e75..0de04de 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -42,10 +42,10 @@
 using kudu::client::KuduClient;
 using kudu::client::KuduTable;
 
-PROFILE_DECLARE_COUNTER(ScanRangesComplete);
-
 namespace impala {
 
+PROFILE_DECLARE_COUNTER(ScanRangesComplete);
+
 const string KuduScanNodeBase::KUDU_ROUND_TRIPS = "TotalKuduScanRoundTrips";
 const string KuduScanNodeBase::KUDU_REMOTE_TOKENS = "KuduRemoteScanTokens";
 ///   KuduClientTime - total amount of time scanner threads spent in the Kudu
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 01208ae..db24acb 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -58,10 +58,8 @@ namespace accumulators = boost::accumulators;
 
 DECLARE_int32(backend_client_rpc_timeout_ms);
 DECLARE_int64(rpc_max_message_size);
-PROFILE_DECLARE_COUNTER(ScanRangesComplete);
 
 namespace impala {
-PROFILE_DECLARE_COUNTER(BytesRead);
 
 const char* Coordinator::BackendState::InstanceStats::LAST_REPORT_TIME_DESC =
     "Last report received time";
@@ -338,48 +336,15 @@ Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
   return status_;
 }
 
-Coordinator::ResourceUtilization Coordinator::BackendState::ComputeResourceUtilization() {
+Coordinator::ResourceUtilization Coordinator::BackendState::GetResourceUtilization() {
   lock_guard<mutex> l(lock_);
   DCHECK(exec_done_) << "May only be called after WaitOnExecRpc() completes.";
-  return ComputeResourceUtilizationLocked();
+  return GetResourceUtilizationLocked();
 }
 
 Coordinator::ResourceUtilization
-Coordinator::BackendState::ComputeResourceUtilizationLocked() {
-  ResourceUtilization result;
-  for (const auto& entry : instance_stats_map_) {
-    RuntimeProfile* profile = entry.second->profile_;
-    ResourceUtilization instance_utilization;
-    // Update resource utilization and apply delta.
-    RuntimeProfile::Counter* user_time = profile->GetCounter("TotalThreadsUserTime");
-    if (user_time != nullptr) instance_utilization.cpu_user_ns = user_time->value();
-
-    RuntimeProfile::Counter* system_time = profile->GetCounter("TotalThreadsSysTime");
-    if (system_time != nullptr) instance_utilization.cpu_sys_ns = system_time->value();
-
-    for (RuntimeProfile::Counter* c : entry.second->bytes_read_counters_) {
-      instance_utilization.bytes_read += c->value();
-    }
-
-    int64_t bytes_sent = 0;
-    for (RuntimeProfile::Counter* c : entry.second->bytes_sent_counters_) {
-      bytes_sent += c->value();
-    }
-
-    // Determine whether this instance had a scan node in its plan.
-    if (instance_utilization.bytes_read > 0) {
-      instance_utilization.scan_bytes_sent = bytes_sent;
-    } else {
-      instance_utilization.exchange_bytes_sent = bytes_sent;
-    }
-
-    RuntimeProfile::Counter* peak_mem =
-        profile->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
-    if (peak_mem != nullptr)
-      instance_utilization.peak_per_host_mem_consumption = peak_mem->value();
-    result.Merge(instance_utilization);
-  }
-  return result;
+Coordinator::BackendState::GetResourceUtilizationLocked() {
+  return backend_utilization_;
 }
 
 void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
@@ -474,8 +439,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     }
 
     DCHECK(!instance_stats->done_);
-    instance_stats->Update(instance_exec_status, *profile_iter, exec_summary,
-        scan_range_progress);
+    instance_stats->Update(instance_exec_status, *profile_iter, exec_summary);
 
     // Update DML stats
     if (instance_exec_status.has_dml_exec_status()) {
@@ -508,6 +472,20 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
       --num_remaining_instances_;
     }
   }
+  // Determine newly-completed scan ranges and update scan_range_progress.
+  int64_t scan_ranges_complete = backend_exec_status.scan_ranges_complete();
+  int64_t scan_range_delta = scan_ranges_complete - total_ranges_complete_;
+  DCHECK_GE(scan_range_delta, 0);
+  scan_range_progress->Update(scan_range_delta);
+  total_ranges_complete_ = scan_ranges_complete;
+
+  backend_utilization_.peak_per_host_mem_consumption =
+      backend_exec_status.peak_mem_consumption();
+  backend_utilization_.cpu_user_ns = backend_exec_status.cpu_user_ns();
+  backend_utilization_.cpu_sys_ns = backend_exec_status.cpu_sys_ns();
+  backend_utilization_.bytes_read = backend_exec_status.bytes_read();
+  backend_utilization_.exchange_bytes_sent = backend_exec_status.exchange_bytes_sent();
+  backend_utilization_.scan_bytes_sent = backend_exec_status.scan_bytes_sent();
 
   // status_ has incorporated the status from all fragment instances. If the overall
   // backend status is not OK, but no specific fragment instance reported an error, then
@@ -732,26 +710,9 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
   (*fragment_stats->bytes_assigned())(total_split_size_);
 }
 
-void Coordinator::BackendState::InstanceStats::InitCounters() {
-  vector<RuntimeProfile*> children;
-  profile_->GetAllChildren(&children);
-  for (RuntimeProfile* p : children) {
-    RuntimeProfile::Counter* c = p->GetCounter(PROFILE_ScanRangesComplete.name());
-    if (c != nullptr) scan_ranges_complete_counters_.push_back(c);
-
-    RuntimeProfile::Counter* bytes_read = p->GetCounter(PROFILE_BytesRead.name());
-    if (bytes_read != nullptr) bytes_read_counters_.push_back(bytes_read);
-
-    RuntimeProfile::Counter* bytes_sent =
-        p->GetCounter(KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER);
-    if (bytes_sent != nullptr) bytes_sent_counters_.push_back(bytes_sent);
-  }
-}
-
 void Coordinator::BackendState::InstanceStats::Update(
     const FragmentInstanceExecStatusPB& exec_status,
-    const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
-    ProgressUpdater* scan_range_progress) {
+    const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary) {
   last_report_time_ms_ = UnixMillis();
   DCHECK_GT(exec_status.report_seq_no(), last_report_seq_no_);
   last_report_seq_no_ = exec_status.report_seq_no();
@@ -759,55 +720,41 @@ void Coordinator::BackendState::InstanceStats::Update(
   profile_->UpdateInfoString(LAST_REPORT_TIME_DESC,
       ToStringFromUnixMillis(last_report_time_ms_));
   profile_->Update(thrift_profile);
-  if (!profile_created_) {
-    profile_created_ = true;
-    InitCounters();
-  }
   profile_->ComputeTimeInProfile();
 
   // update exec_summary
-  // TODO: why do this every time we get an updated instance profile?
-  vector<RuntimeProfile*> children;
-  profile_->GetAllChildren(&children);
   TExecSummary& thrift_exec_summary = exec_summary->thrift_exec_summary;
-  for (RuntimeProfile* child : children) {
-    bool is_plan_node = child->metadata().__isset.plan_node_id;
-    bool is_data_sink = child->metadata().__isset.data_sink_id;
-    // Plan Nodes and data sinks get an entry in the summary.
-    if (!is_plan_node && !is_data_sink) continue;
-
+  for (const ExecSummaryDataPB& exec_summary_entry : exec_status.exec_summary_data()) {
+    bool is_plan_node = exec_summary_entry.has_plan_node_id();
+    bool is_data_sink = exec_summary_entry.has_data_sink_id();
+    DCHECK(is_plan_node || is_data_sink) << "Invalid exec summary entry sent by executor";
     int exec_summary_idx;
     if (is_plan_node) {
-      exec_summary_idx = exec_summary->node_id_to_idx_map[child->metadata().plan_node_id];
+      exec_summary_idx =
+          exec_summary->node_id_to_idx_map[exec_summary_entry.plan_node_id()];
     } else {
       exec_summary_idx =
-          exec_summary->data_sink_id_to_idx_map[child->metadata().data_sink_id];
+          exec_summary->data_sink_id_to_idx_map[exec_summary_entry.data_sink_id()];
     }
     TPlanNodeExecSummary& node_exec_summary = thrift_exec_summary.nodes[exec_summary_idx];
     DCHECK_EQ(node_exec_summary.fragment_idx, exec_params_.fragment().idx);
     int per_fragment_instance_idx = exec_params_.per_fragment_instance_idx;
     DCHECK_LT(per_fragment_instance_idx, node_exec_summary.exec_stats.size())
-        << " name=" << child->name()
         << " instance_id=" << PrintId(exec_params_.instance_id)
         << " fragment_idx=" << exec_params_.fragment().idx;
     TExecStats& instance_stats = node_exec_summary.exec_stats[per_fragment_instance_idx];
 
-    RuntimeProfile::Counter* rows_counter = child->GetCounter("RowsReturned");
-    RuntimeProfile::Counter* mem_counter = child->GetCounter("PeakMemoryUsage");
-    if (rows_counter != nullptr) instance_stats.__set_cardinality(rows_counter->value());
-    if (mem_counter != nullptr) instance_stats.__set_memory_used(mem_counter->value());
-    instance_stats.__set_latency_ns(child->local_time());
-    // TODO: track interesting per-node metrics
+    if (exec_summary_entry.has_rows_returned()) {
+      instance_stats.__set_cardinality(exec_summary_entry.rows_returned());
+    }
+    if (exec_summary_entry.has_peak_mem_usage()) {
+      instance_stats.__set_memory_used(exec_summary_entry.peak_mem_usage());
+    }
+    DCHECK(exec_summary_entry.has_local_time_ns());
+    instance_stats.__set_latency_ns(exec_summary_entry.local_time_ns());
     node_exec_summary.__isset.exec_stats = true;
   }
 
-  // determine newly-completed scan ranges and update scan_range_progress
-  int64_t total = 0;
-  for (RuntimeProfile::Counter* c: scan_ranges_complete_counters_) total += c->value();
-  int64_t delta = total - total_ranges_complete_;
-  total_ranges_complete_ = total;
-  scan_range_progress->Update(delta);
-
   // extract the current execution state of this instance
   current_state_ = exec_status.current_state();
 }
@@ -891,7 +838,7 @@ void Coordinator::FragmentStats::AddExecStats() {
 void Coordinator::BackendState::ToJson(Value* value, Document* document) {
   unique_lock<mutex> l(lock_);
   DCHECK(exec_done_) << "May only be called after WaitOnExecRpc() completes.";
-  ResourceUtilization resource_utilization = ComputeResourceUtilizationLocked();
+  ResourceUtilization resource_utilization = GetResourceUtilizationLocked();
   value->AddMember("num_instances", fragments_.size(), document->GetAllocator());
   value->AddMember("done", IsDoneLocked(l), document->GetAllocator());
   value->AddMember("peak_per_host_mem_consumption",
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index d1702eb..d92ca21 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -136,12 +136,13 @@ class Coordinator::BackendState {
   /// Update overall execution status, including the instances' exec status/profiles
   /// and the error log, if this backend is not already done. Updates the fragment
   /// instances' TExecStats in exec_summary (exec_summary->nodes.exec_stats) and updates
-  /// progress_update. If any instance reports an error, the overall execution status
-  /// becomes the first reported error status. Returns true iff this update changed
-  /// IsDone() from false to true, either because it was the last fragment to complete or
-  /// because it was the first error received. Adds the AuxErrorInfoPB from each
-  /// FragmentInstanceExecStatusPB in backend_exec_status to the vector
-  /// aux_error_info.
+  /// scan_range_progress with any newly-completed scan ranges.
+  ///
+  /// If any instance reports an error, the overall execution status becomes the first
+  /// reported error status. Returns true iff this update changed IsDone() from false
+  /// to true, either because it was the last fragment to complete or because it was
+  /// the first error received. Adds the AuxErrorInfoPB from each
+  /// FragmentInstanceExecStatusPB in backend_exec_status to the vector aux_error_info.
   bool ApplyExecStatusReport(const ReportExecStatusRequestPB& backend_exec_status,
       const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
       ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state,
@@ -177,7 +178,7 @@ class Coordinator::BackendState {
 
   /// Return peak memory consumption and aggregated resource usage across all fragment
   /// instances for this backend.
-  ResourceUtilization ComputeResourceUtilization();
+  ResourceUtilization GetResourceUtilization();
 
   /// Merge the accumulated error log into 'merged'.
   void MergeErrorLog(ErrorLogMap* merged);
@@ -226,12 +227,10 @@ class Coordinator::BackendState {
         ObjectPool* obj_pool);
 
     /// Updates 'this' with exec_status and the fragment intance's thrift profile. Also
-    /// updates the fragment instance's TExecStats in exec_summary and 'progress_updater'
-    /// with the number of newly completed scan ranges. Also updates the instance's avg
-    /// profile. Caller must hold BackendState::lock_.
+    /// updates the fragment instance's TExecStats in exec_summary. Also updates the
+    /// instance's avg profile. Caller must hold BackendState::lock_.
     void Update(const FragmentInstanceExecStatusPB& exec_status,
-        const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
-        ProgressUpdater* scan_range_progress);
+        const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary);
 
     int per_fragment_instance_idx() const {
       return exec_params_.per_fragment_instance_idx;
@@ -270,36 +269,18 @@ class Coordinator::BackendState {
     /// in ApplyExecStatusReport()
     bool done_ = false;
 
-    /// true after the first call to profile->Update()
-    bool profile_created_ = false;
-
     /// cumulative size of all splits; set in c'tor
     int64_t total_split_size_ = 0;
 
     /// wall clock timer for this instance
     MonotonicStopWatch stopwatch_;
 
-    /// total scan ranges complete across all scan nodes
-    int64_t total_ranges_complete_ = 0;
-
-    /// SCAN_RANGES_COMPLETE_COUNTERs in profile_
-    std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
-
-    /// Collection of BYTES_READ_COUNTERs of all scan nodes in this fragment instance.
-    std::vector<RuntimeProfile::Counter*> bytes_read_counters_;
-
-    /// Collection of TotalBytesSent of all data stream senders in this fragment instance.
-    std::vector<RuntimeProfile::Counter*> bytes_sent_counters_;
-
     /// Descriptor string for the last query status report time in the profile.
     static const char* LAST_REPORT_TIME_DESC;
 
     /// The current state of this fragment instance's execution. This gets serialized in
     /// ToJson() and is displayed in the debug webpages.
     FInstanceExecStatePB current_state_ = FInstanceExecStatePB::WAITING_FOR_EXEC;
-
-    /// Extracts scan_ranges_complete_counters_ and  bytes_read_counters_ from profile_.
-    void InitCounters();
   };
 
   /// QuerySchedule associated with the Coordinator that owns this BackendState.
@@ -392,6 +373,12 @@ class Coordinator::BackendState {
   /// True if a CancelQueryFInstances RPC was already sent to this backend.
   bool sent_cancel_rpc_ = false;
 
+  /// Total scan ranges complete across all scan nodes. Set in ApplyExecStatusReport().
+  int64_t total_ranges_complete_ = 0;
+
+  /// Resource utilization values. Set in ApplyExecStatusReport().
+  ResourceUtilization backend_utilization_;
+
   /// END: Members that are protected by 'lock_'.
   /////////////////////////////////////////
 
@@ -417,8 +404,8 @@ class Coordinator::BackendState {
   /// Version of IsDone() where caller must hold lock_ via lock;
   bool IsDoneLocked(const std::unique_lock<std::mutex>& lock) const;
 
-  /// Same as ComputeResourceUtilization() but caller must hold lock.
-  ResourceUtilization ComputeResourceUtilizationLocked();
+  /// Same as GetResourceUtilization() but caller must hold lock.
+  ResourceUtilization GetResourceUtilizationLocked();
 
   /// Logs 'msg' at the VLOG_QUERY level, along with 'query_id_' and 'krpc_host_'.
   void VLogForBackend(const std::string& msg);
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 91415ce..4b502e7 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1129,7 +1129,7 @@ void Coordinator::ComputeQuerySummary() {
   stringstream mem_info, cpu_user_info, cpu_system_info, bytes_read_info;
   ResourceUtilization total_utilization;
   for (BackendState* backend_state: backend_states_) {
-    ResourceUtilization utilization = backend_state->ComputeResourceUtilization();
+    ResourceUtilization utilization = backend_state->GetResourceUtilization();
     total_utilization.Merge(utilization);
     string network_address = NetworkAddressPBToString(backend_state->impalad_address());
     mem_info << network_address << "("
@@ -1247,7 +1247,7 @@ Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization()
   DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first";
   ResourceUtilization query_resource_utilization;
   for (BackendState* backend_state: backend_states_) {
-    query_resource_utilization.Merge(backend_state->ComputeResourceUtilization());
+    query_resource_utilization.Merge(backend_state->GetResourceUtilization());
   }
   return query_resource_utilization;
 }
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 09db8f6..3322003 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -40,6 +40,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/fragment-state.h"
 #include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/row-batch.h"
@@ -56,9 +57,10 @@
 
 using google::protobuf::RepeatedPtrField;
 using kudu::rpc::RpcContext;
-using namespace impala;
 using namespace apache::thrift;
 
+namespace impala {
+
 const string FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage";
 const string FragmentInstanceState::FINST_THREAD_GROUP_NAME = "fragment-execution";
 const string FragmentInstanceState::FINST_THREAD_NAME_PREFIX = "exec-finstance";
@@ -67,6 +69,9 @@ static const string OPEN_TIMER_NAME = "OpenTime";
 static const string PREPARE_TIMER_NAME = "PrepareTime";
 static const string EXEC_TIMER_NAME = "ExecTime";
 
+PROFILE_DECLARE_COUNTER(ScanRangesComplete);
+PROFILE_DECLARE_COUNTER(BytesRead);
+
 FragmentInstanceState::FragmentInstanceState(QueryState* query_state,
     FragmentState* fragment_state, const TPlanFragmentInstanceCtx& instance_ctx,
     const PlanFragmentInstanceCtxPB& instance_ctx_pb)
@@ -277,6 +282,50 @@ void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
   instance_status->set_current_state(current_state());
   DCHECK(profile() != nullptr);
   profile()->ToThrift(thrift_profile);
+
+  // Pull out and aggregate counters from the profile.
+  RuntimeProfile::Counter* user_time = profile()->GetCounter("TotalThreadsUserTime");
+  if (user_time != nullptr) cpu_user_ns_ = user_time->value();
+
+  RuntimeProfile::Counter* system_time = profile()->GetCounter("TotalThreadsSysTime");
+  if (system_time != nullptr) cpu_sys_ns_ = system_time->value();
+
+  // Compute local_time for use below.
+  profile()->ComputeTimeInProfile();
+  vector<RuntimeProfile*> nodes;
+  profile()->GetAllChildren(&nodes);
+  int64_t bytes_read = 0;
+  int64_t scan_ranges_complete = 0;
+  int64_t total_bytes_sent = 0;
+  for (RuntimeProfile* node : nodes) {
+    RuntimeProfile::Counter* c = node->GetCounter(PROFILE_BytesRead.name());
+    if (c != nullptr) bytes_read += c->value();
+    c = node->GetCounter(PROFILE_ScanRangesComplete.name());
+    if (c != nullptr) scan_ranges_complete += c->value();
+    c = node->GetCounter(KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER);
+    if (c != nullptr) total_bytes_sent += c->value();
+
+    bool is_plan_node = node->metadata().__isset.plan_node_id;
+    bool is_data_sink = node->metadata().__isset.data_sink_id;
+    // Plan Nodes and data sinks get an entry in the exec summary.
+    if (is_plan_node || is_data_sink) {
+      ExecSummaryDataPB* summary_data = instance_status->add_exec_summary_data();
+      if (is_plan_node) {
+        summary_data->set_plan_node_id(node->metadata().plan_node_id);
+      } else {
+        summary_data->set_data_sink_id(node->metadata().data_sink_id);
+      }
+      RuntimeProfile::Counter* rows_counter = node->GetCounter("RowsReturned");
+      RuntimeProfile::Counter* mem_counter = node->GetCounter("PeakMemoryUsage");
+      if (rows_counter != nullptr) summary_data->set_rows_returned(rows_counter->value());
+      if (mem_counter != nullptr) summary_data->set_peak_mem_usage(mem_counter->value());
+      summary_data->set_local_time_ns(node->local_time());
+    }
+  }
+  bytes_read_ = bytes_read;
+  scan_ranges_complete_ = scan_ranges_complete;
+  total_bytes_sent_  = total_bytes_sent;
+
   // Send the DML stats if this is the final report.
   if (done) {
     runtime_state()->dml_exec_state()->ToProto(
@@ -580,3 +629,4 @@ void FragmentInstanceState::PrintVolumeIds() {
       << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
       << PrintId(query_id()) << ":\n" << str.str();
 }
+}
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 0737050..ac8a73e 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -143,6 +143,12 @@ class FragmentInstanceState {
   const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
   bool IsDone() const { return current_state_.Load() == FInstanceExecStatePB::FINISHED; }
   ObjectPool* obj_pool();
+  int64_t scan_ranges_complete() const { return scan_ranges_complete_; }
+  int64_t peak_mem_consumption() const { return peak_mem_consumption_; }
+  int64_t cpu_user_ns() const { return cpu_user_ns_; }
+  int64_t cpu_sys_ns() const { return cpu_sys_ns_; }
+  int64_t bytes_read() const { return bytes_read_; }
+  int64_t total_bytes_sent() const { return total_bytes_sent_; }
 
   /// Returns true if the current thread is a thread executing the whole or part of
   /// a fragment instance.
@@ -191,6 +197,22 @@ class FragmentInstanceState {
   /// number should not be bumped for future reports.
   bool final_report_generated_ = false;
 
+  /// Total scan ranges complete across all scan nodes. Set in GetStatusReport().
+  int64_t scan_ranges_complete_ = 0;
+
+  /// Last peak memory consumption value. Set in GetStatusReport().
+  int64_t peak_mem_consumption_ = 0;
+
+  /// Last CPU user and system totals in ns. Set in GetStatusReport().
+  int64_t cpu_user_ns_ = 0;
+  int64_t cpu_sys_ns_ = 0;
+
+  /// Sum of BytesRead counters on this backend. Set in GetStatusReport().
+  int64_t bytes_read_ = 0;
+
+  /// Total bytes sent on exchanges in this backend. Set in GetStatusReport().
+  int64_t total_bytes_sent_ = 0;
+
   /// Profile for timings for each stage of the plan fragment instance's lifecycle.
   /// Lives in obj_pool().
   RuntimeProfile* timings_profile_ = nullptr;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 7bf3a55..a810fbb 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -68,7 +68,7 @@ DECLARE_int64(rpc_max_message_size);
 DEFINE_int32_hidden(stress_status_report_delay_ms, 0, "Stress option to inject a delay "
     "before status reports. Has no effect on release builds.");
 
-using namespace impala;
+namespace impala {
 
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
   DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
@@ -417,20 +417,48 @@ void QueryState::ConstructReport(bool instances_started,
   host_profile_->ClearChunkedTimeSeriesCounters();
 
   if (instances_started) {
+    // Stats that we aggregate across the instances.
+    int64_t cpu_user_ns = 0;
+    int64_t cpu_sys_ns = 0;
+    int64_t bytes_read = 0;
+    int64_t scan_ranges_complete = 0;
+    int64_t exchange_bytes_sent = 0;
+    int64_t scan_bytes_sent = 0;
+
     for (const auto& entry : fis_map_) {
       FragmentInstanceState* fis = entry.second;
 
       // If this fragment instance has already sent its last report, skip it.
       if (fis->final_report_sent()) {
         DCHECK(fis->IsDone());
-        continue;
+      } else {
+        // Update the status and profiles of this fragment instance.
+        FragmentInstanceExecStatusPB* instance_status =
+            report->add_instance_exec_status();
+        profiles_forest->profile_trees.emplace_back();
+        fis->GetStatusReport(instance_status, &profiles_forest->profile_trees.back());
       }
 
-      // Update the status and profiles of this fragment instance.
-      FragmentInstanceExecStatusPB* instance_status = report->add_instance_exec_status();
-      profiles_forest->profile_trees.emplace_back();
-      fis->GetStatusReport(instance_status, &profiles_forest->profile_trees.back());
+      // Include these values for running and completed finstances in the status report.
+      cpu_user_ns += fis->cpu_user_ns();
+      cpu_sys_ns += fis->cpu_sys_ns();
+      bytes_read += fis->bytes_read();
+      scan_ranges_complete += fis->scan_ranges_complete();
+      // Determine whether this instance had a scan node in its plan.
+      // Note: this is hacky. E.g. it doesn't work for Kudu scans.
+      if (fis->bytes_read() > 0) {
+        scan_bytes_sent += fis->total_bytes_sent();
+      } else {
+        exchange_bytes_sent += fis->total_bytes_sent();
+      }
     }
+    report->set_peak_mem_consumption(query_mem_tracker_->peak_consumption());
+    report->set_cpu_user_ns(cpu_user_ns);
+    report->set_cpu_sys_ns(cpu_sys_ns);
+    report->set_bytes_read(bytes_read);
+    report->set_scan_ranges_complete(scan_ranges_complete);
+    report->set_exchange_bytes_sent(exchange_bytes_sent);
+    report->set_scan_bytes_sent(scan_bytes_sent);
   }
 }
 
@@ -808,3 +836,4 @@ Status QueryState::StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tr
   }
   return Status::OK();
 }
+}
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index ba5454b..52495b8 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -129,6 +129,24 @@ message StatefulStatusPB {
   optional AuxErrorInfoPB aux_error_info = 3;
 }
 
+// Per-node stats required for the exec summary.
+message ExecSummaryDataPB {
+  // Plan node ID, set if this is for a PlanNode.
+  optional int32 plan_node_id = 1;
+
+  // Plan node ID, set if this is for a DataSink.
+  optional int32 data_sink_id = 2;
+
+  // Rows returned from this node, if this is a PlanNode.
+  optional int64 rows_returned = 3;
+
+  // Peak memory usage in bytes of this PlanNode or DataSink.
+  optional int64 peak_mem_usage = 4;
+
+  // Local time in nanoseconds spent in this plan node.
+  optional int64 local_time_ns = 5;
+}
+
 // RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
 // RPC to another node failed.
 message RPCErrorInfoPB {
@@ -168,6 +186,9 @@ message FragmentInstanceExecStatusPB {
   // The non-idempotent parts of the report, and any prior reports that are not known to
   // have been received by the coordinator.
   repeated StatefulStatusPB stateful_report = 6;
+
+  // Per-node stats required for the exec summary.
+  repeated ExecSummaryDataPB exec_summary_data = 7;
 }
 
 message ReportExecStatusRequestPB {
@@ -191,6 +212,28 @@ message ReportExecStatusRequestPB {
   // the fragment which sets 'overall_status' above. Not set if 'overall_status' is a
   // general error (e.g. failure to start fragment instances).
   optional UniqueIdPB fragment_instance_id = 6;
+
+  // Peak memory usage for this query on this backend in bytes.
+  optional int64 peak_mem_consumption = 7;
+
+  // User CPU utilization for the query on this backend in ns.
+  optional int64 cpu_user_ns = 8;
+
+  // System CPU utilization for the query on this backend in ns.
+  optional int64 cpu_sys_ns = 9;
+
+  // Sum of BytesRead counters on this backend.
+  optional int64 bytes_read = 10;
+
+  // Total scan ranges completed on this backend.
+  optional int64 scan_ranges_complete = 11;
+
+  // Total bytes sent by instances that did not contain a scan node.
+  optional int64 exchange_bytes_sent = 12;
+
+  // Total bytes sent by instances that contained a scan node.
+  optional int64 scan_bytes_sent = 13;
+
 }
 
 message ReportExecStatusResponsePB {
@@ -369,4 +412,4 @@ service ControlService {
 
   // Called to initiate shutdown of this backend.
   rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);
-}
\ No newline at end of file
+}