You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2019/01/25 16:11:08 UTC

[impala] 03/04: IMPALA-7731: Add Read/Exchange counters to profile

This is an automated email from the ASF dual-hosted git repository.

mikeb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8c673ed6a00d1ed5c542a2311608993651bae850
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Wed Jan 16 10:06:05 2019 -0800

    IMPALA-7731: Add Read/Exchange counters to profile
    
    Selective scans (and by extension selective fragment instances)
    take higher performance hits when reading data remotely. They can
    be identified by a low ratio between data being transmitted vs data
    being read from HDFS.
    
    This change adds several counters to the profile to make it easier to
    identify queries based on their scan instance selectivity.
    
    * TotalBytesSent - The total number of bytes sent by a query in
      exchange nodes. Does not include remote reads, data written to disk,
      or data sent to the client.
    
    * TotalScanBytesSent - The total number of bytes sent by fragment
      instances that had a scan node in their plan.
    
    * TotalInnerBytesSent - The total number of bytes sent by fragment
      instances that did not have a scan node in their plan, i.e. that
      received their input data from other instances through exchange node.
    
    * ExchangeScanRatio - The ratio between TotalScanBytesSent and
      TotalBytesRead, i.e. the selectivity over all fragment instances that
      had a scan node in their plan. This counter is also added to each
      fragment instance.
    
    * InnerNodeSelectivityRatio - The ratio between bytes sent by instances
      with a scan node in their plan and instances without a scan node in
      their plan. This indicates how well the inner nodes of the execution
      plan reduced the data volume.
    
    Change-Id: Ife7ec78fe42558429c1cbe6e5eba79842bffd648
    Reviewed-on: http://gerrit.cloudera.org:8080/12229
    Reviewed-by: Lars Volker <lv...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/scan-node.cc                    |  1 +
 be/src/runtime/coordinator-backend-state.cc | 19 +++++++++++++--
 be/src/runtime/coordinator-backend-state.h  |  3 +++
 be/src/runtime/coordinator.cc               | 37 +++++++++++++++++++++++++++++
 be/src/runtime/coordinator.h                |  8 +++++++
 be/src/runtime/fragment-instance-state.cc   |  7 ++++++
 be/src/runtime/krpc-data-stream-sender.cc   |  2 ++
 be/src/runtime/krpc-data-stream-sender.h    |  3 +++
 be/src/runtime/runtime-state.cc             |  9 +++++++
 be/src/runtime/runtime-state.h              | 18 ++++++++++++++
 be/src/util/pretty-printer.h                |  1 -
 tests/query_test/test_observability.py      | 36 ++++++++++++++++++++++++++++
 12 files changed, 141 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index ce04a38..906af66 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -131,6 +131,7 @@ Status ScanNode::Prepare(RuntimeState* state) {
 void ScanNode::AddBytesReadCounters() {
   bytes_read_counter_ =
       ADD_COUNTER(runtime_profile(), BYTES_READ_COUNTER, TUnit::BYTES);
+  runtime_state()->AddBytesReadCounter(bytes_read_counter_);
   bytes_read_timeseries_counter_ = ADD_TIME_SERIES_COUNTER(runtime_profile(),
       BYTES_READ_COUNTER, bytes_read_counter_);
   total_throughput_counter_ = runtime_profile()->AddRateCounter(
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index a2e1901..5d79450 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -32,6 +32,7 @@
 #include "runtime/debug-options.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/krpc-data-stream-sender.h"
 #include "service/control-service.h"
 #include "util/counting-barrier.h"
 #include "util/error-util-internal.h"
@@ -244,6 +245,18 @@ Coordinator::BackendState::ComputeResourceUtilizationLocked() {
       instance_utilization.bytes_read += c->value();
     }
 
+    int64_t bytes_sent = 0;
+    for (RuntimeProfile::Counter* c : entry.second->bytes_sent_counters_) {
+      bytes_sent += c->value();
+    }
+
+    // Determine whether this instance had a scan node in its plan.
+    if (instance_utilization.bytes_read > 0) {
+      instance_utilization.scan_bytes_sent = bytes_sent;
+    } else {
+      instance_utilization.exchange_bytes_sent = bytes_sent;
+    }
+
     RuntimeProfile::Counter* peak_mem =
         profile->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
     if (peak_mem != nullptr)
@@ -508,13 +521,15 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
   vector<RuntimeProfile*> children;
   profile_->GetAllChildren(&children);
   for (RuntimeProfile* p : children) {
-    // This profile is not for an exec node.
-    if (!p->metadata().__isset.plan_node_id) continue;
     RuntimeProfile::Counter* c = p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER);
     if (c != nullptr) scan_ranges_complete_counters_.push_back(c);
 
     RuntimeProfile::Counter* bytes_read = p->GetCounter(ScanNode::BYTES_READ_COUNTER);
     if (bytes_read != nullptr) bytes_read_counters_.push_back(bytes_read);
+
+    RuntimeProfile::Counter* bytes_sent =
+        p->GetCounter(KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER);
+    if (bytes_sent != nullptr) bytes_sent_counters_.push_back(bytes_sent);
   }
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index e84e097..15790f9 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -215,6 +215,9 @@ class Coordinator::BackendState {
     /// Collection of BYTES_READ_COUNTERs of all scan nodes in this fragment instance.
     std::vector<RuntimeProfile::Counter*> bytes_read_counters_;
 
+    /// Collection of TotalBytesSent of all data stream senders in this fragment instance.
+    std::vector<RuntimeProfile::Counter*> bytes_sent_counters_;
+
     /// Descriptor string for the last query status report time in the profile.
     static const char* LAST_REPORT_TIME_DESC;
 
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index e96d673..0142be9 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -774,8 +774,45 @@ void Coordinator::ComputeQuerySummary() {
                     << ") ";
   }
 
+  // The total number of bytes read by this query.
   COUNTER_SET(ADD_COUNTER(query_profile_, "TotalBytesRead", TUnit::BYTES),
       total_utilization.bytes_read);
+  // The total number of bytes sent by this query in exchange nodes. Does not include
+  // remote reads, data written to disk, or data sent to the client.
+  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalBytesSent", TUnit::BYTES),
+      total_utilization.scan_bytes_sent + total_utilization.exchange_bytes_sent);
+  // The total number of bytes sent by fragment instances that had a scan node in their
+  // plan.
+  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalScanBytesSent", TUnit::BYTES),
+      total_utilization.scan_bytes_sent);
+  // The total number of bytes sent by fragment instances that did not have a scan node in
+  // their plan, i.e. that received their input data from other instances through exchange
+  // node.
+  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalInnerBytesSent", TUnit::BYTES),
+      total_utilization.exchange_bytes_sent);
+
+  double xchg_scan_ratio = 0;
+  if (total_utilization.bytes_read > 0) {
+    xchg_scan_ratio =
+        (double)total_utilization.scan_bytes_sent / total_utilization.bytes_read;
+  }
+  // The ratio between TotalScanBytesSent and TotalBytesRead, i.e. the selectivity over
+  // all fragment instances that had a scan node in their plan.
+  COUNTER_SET(ADD_COUNTER(query_profile_, "ExchangeScanRatio", TUnit::DOUBLE_VALUE),
+      xchg_scan_ratio);
+
+  double inner_node_ratio = 0;
+  if (total_utilization.scan_bytes_sent > 0) {
+    inner_node_ratio =
+        (double)total_utilization.exchange_bytes_sent / total_utilization.scan_bytes_sent;
+  }
+  // The ratio between bytes sent by instances with a scan node in their plan and
+  // instances without a scan node in their plan. This indicates how well the inner nodes
+  // of the execution plan reduced the data volume.
+  COUNTER_SET(
+      ADD_COUNTER(query_profile_, "InnerNodeSelectivityRatio", TUnit::DOUBLE_VALUE),
+      inner_node_ratio);
+
   COUNTER_SET(ADD_COUNTER(query_profile_, "TotalCpuTime", TUnit::TIME_NS),
       total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
 
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9be32fa..a559c14 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -179,6 +179,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
     /// Total bytes read across all scan nodes.
     int64_t bytes_read = 0;
 
+    /// Total bytes sent by instances that did not contain a scan node.
+    int64_t exchange_bytes_sent = 0;
+
+    /// Total bytes sent by instances that contained a scan node.
+    int64_t scan_bytes_sent = 0;
+
     /// Total user cpu consumed.
     int64_t cpu_user_ns = 0;
 
@@ -190,6 +196,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
       peak_per_host_mem_consumption =
           std::max(peak_per_host_mem_consumption, other.peak_per_host_mem_consumption);
       bytes_read += other.bytes_read;
+      exchange_bytes_sent += other.exchange_bytes_sent;
+      scan_bytes_sent += other.scan_bytes_sent;
       cpu_user_ns += other.cpu_user_ns;
       cpu_sys_ns += other.cpu_sys_ns;
     }
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index d37fda3..c8ef699 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -233,6 +233,13 @@ Status FragmentInstanceState::Prepare() {
   per_host_mem_usage_ =
       ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);
 
+  profile()->AddDerivedCounter("ExchangeScanRatio", TUnit::DOUBLE_VALUE, [this](){
+      int64_t counter_val = 0;
+      *reinterpret_cast<double*>(&counter_val) =
+          runtime_state_->ComputeExchangeScanRatio();
+      return counter_val;
+      });
+
   row_batch_.reset(
       new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),
         runtime_state_->instance_mem_tracker()));
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 6bf3b78..123f302 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -67,6 +67,7 @@ namespace impala {
 const char* KrpcDataStreamSender::HASH_ROW_SYMBOL =
     "KrpcDataStreamSender7HashRowEPNS_8TupleRowE";
 const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStreamSender";
+const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
 
 // A datastream sender may send row batches to multiple destinations. There is one
 // channel for each destination.
@@ -645,6 +646,7 @@ Status KrpcDataStreamSender::Prepare(
   rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
   rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
   bytes_sent_counter_ = ADD_COUNTER(profile(), "TotalBytesSent", TUnit::BYTES);
+  state->AddBytesSentCounter(bytes_sent_counter_);
   bytes_sent_time_series_counter_ =
       ADD_TIME_SERIES_COUNTER(profile(), "BytesSent", bytes_sent_counter_);
   network_throughput_counter_ =
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index bf3cfdc..341d727 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -96,6 +96,9 @@ class KrpcDataStreamSender : public DataSink {
   /// illegal after calling Close().
   virtual void Close(RuntimeState* state) override;
 
+  /// Counters shared with other parts of the code
+  static const char* TOTAL_BYTES_SENT_COUNTER;
+
  protected:
   friend class DataStreamTest;
 
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 0899964..bcaa028 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -229,6 +229,15 @@ Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
   return Status::OK();
 }
 
+double RuntimeState::ComputeExchangeScanRatio() const {
+  int64_t bytes_read = 0;
+  for (const auto& c : bytes_read_counters_) bytes_read += c->value();
+  if (bytes_read == 0) return 0;
+  int64_t bytes_sent = 0;
+  for (const auto& c : bytes_sent_counters_) bytes_sent += c->value();
+  return (double)bytes_sent / bytes_read;
+}
+
 void RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
     int64_t failed_allocation_size, const ErrorMsg* msg) {
   // Constructing the MemLimitExceeded and logging it is not cheap, so
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 66985a4..5025215 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -243,6 +243,18 @@ class RuntimeState {
    return total_thread_statistics_;
   }
 
+  void AddBytesReadCounter(RuntimeProfile::Counter* counter) {
+    bytes_read_counters_.push_back(counter);
+  }
+
+  void AddBytesSentCounter(RuntimeProfile::Counter* counter) {
+    bytes_sent_counters_.push_back(counter);
+  }
+
+  /// Computes the ratio between the bytes sent and the bytes read by this runtime state's
+  /// fragment instance. For fragment instances that don't scan data, this returns 0.
+  double ComputeExchangeScanRatio() const;
+
   /// Sets query_status_ with err_msg if no error has been set yet.
   void SetQueryStatus(const std::string& err_msg) {
     boost::lock_guard<SpinLock> l(query_status_lock_);
@@ -350,6 +362,12 @@ class RuntimeState {
   /// Total CPU utilization for all threads in this plan fragment.
   RuntimeProfile::ThreadCounters* total_thread_statistics_;
 
+  /// BytesRead counters in this instance's tree, not owned.
+  std::vector<RuntimeProfile::Counter*> bytes_read_counters_;
+
+  /// Counters for bytes sent over the network in this instance's tree, not owned.
+  std::vector<RuntimeProfile::Counter*> bytes_sent_counters_;
+
   /// Memory usage of this fragment instance, a child of 'query_mem_tracker_'.
   boost::scoped_ptr<MemTracker> instance_mem_tracker_;
 
diff --git a/be/src/util/pretty-printer.h b/be/src/util/pretty-printer.h
index 03ba99e..6226eba 100644
--- a/be/src/util/pretty-printer.h
+++ b/be/src/util/pretty-printer.h
@@ -129,7 +129,6 @@ class PrettyPrinter {
         break;
       }
 
-      /// TODO: Remove DOUBLE_VALUE. IMPALA-1649
       case TUnit::DOUBLE_VALUE: {
         double output = *reinterpret_cast<double*>(&value);
         ss << std::setprecision(PRECISION) << output << " ";
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index efdf285..3aa2514 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from collections import defaultdict
 from datetime import datetime
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
@@ -382,6 +383,41 @@ class TestObservability(ImpalaTestSuite):
         query_ids.append(query_id)
     assert len(query_ids) == 3, results.runtime_profile
 
+  def test_global_resource_counters_in_profile(self):
+    """Test that a set of global resource usage counters show up in the profile."""
+    query = "select count(*) from functional.alltypes"
+    profile = self.execute_query(query).runtime_profile
+    expected_counters = ["TotalBytesRead", "TotalBytesSent", "TotalScanBytesSent",
+                         "TotalInnerBytesSent", "ExchangeScanRatio",
+                         "InnerNodeSelectivityRatio"]
+    assert all(counter in profile for counter in expected_counters)
+
+  def test_global_exchange_counters(self):
+    """Test that global exchange counters are set correctly."""
+    query = """select count(*) from tpch_parquet.orders o inner join tpch_parquet.lineitem
+        l on o.o_orderkey = l.l_orderkey group by o.o_clerk limit 10"""
+    profile = self.execute_query(query).runtime_profile
+    assert "ExchangeScanRatio: 3.19" in profile
+
+    keys = ["TotalBytesSent", "TotalScanBytesSent", "TotalInnerBytesSent"]
+    counters = defaultdict(int)
+    for line in profile.splitlines():
+      for key in keys:
+        if key in line:
+          # Match byte count within parentheses
+          m = re.search("\(([0-9]+)\)", line)
+          assert m
+          # Only keep first (query-level) counter
+          if counters[key] == 0:
+            counters[key] = int(m.group(1))
+
+    # All counters have values
+    assert all(counters[key] > 0 for key in keys)
+
+    assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"] +
+                                          counters["TotalInnerBytesSent"])
+
+
 class TestThriftProfile(ImpalaTestSuite):
   @classmethod
   def get_workload(self):