You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/21 21:38:52 UTC

[2/3] impala git commit: IMPALA-7449: Fix network throughput calculation of DataStreamSender

IMPALA-7449: Fix network throughput calculation of DataStreamSender

Currently, the network throughput presented in the query profile
for DataStreamSender is computed by dividing the total bytes sent
by the total network time which is the sum of observed network time
of all individual RPCs. This is wrong in general and may only make
sense if the network throughput is fixed. In addition, RPCs are
asynchronous and they overlap with each other. So, dividing the
total byte sent by network throughput may result in time which exceeds
the wall clock time, making it impossible to interpret.

This change fixes the problem by measuring the network throughput
of each individual RPC and uses a summary counter to track avg/min/max
of network throughputs instead.

Change-Id: I344ac76c0a1a49b4da3d37d2c547f3d5051ebe24
Reviewed-on: http://gerrit.cloudera.org:8080/11241
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2a60655b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2a60655b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2a60655b

Branch: refs/heads/master
Commit: 2a60655b09afaa76d3bf2120c7043eb0b22eefcf
Parents: f849eff
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Aug 15 14:25:16 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Aug 21 21:29:07 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/krpc-data-stream-sender.cc | 17 ++++++++++-------
 be/src/runtime/krpc-data-stream-sender.h  | 11 ++++-------
 2 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2a60655b/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index e6cea1f..6a2e5b3 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -394,10 +394,16 @@ void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
   const kudu::Status controller_status = rpc_controller_.status();
   if (LIKELY(controller_status.ok())) {
     DCHECK(rpc_in_flight_batch_ != nullptr);
-    COUNTER_ADD(parent_->bytes_sent_counter_,
-        RowBatch::GetSerializedSize(*rpc_in_flight_batch_));
+    int64_t row_batch_size = RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
     int64_t network_time = total_time - resp_.receiver_latency_ns();
-    COUNTER_ADD(&parent_->total_network_timer_, network_time);
+    COUNTER_ADD(parent_->bytes_sent_counter_, row_batch_size);
+    if (LIKELY(network_time > 0)) {
+      // 'row_batch_size' is bounded by FLAGS_rpc_max_message_size which shouldn't exceed
+      // max 32-bit signed value so multiplication below should not overflow.
+      DCHECK_LE(row_batch_size, numeric_limits<int32_t>::max());
+      int64_t network_throughput = row_batch_size * NANOS_PER_SEC / network_time;
+      parent_->network_throughput_counter_->UpdateCounter(network_throughput);
+    }
     Status rpc_status = Status::OK();
     int32_t status_code = resp_.status().status_code();
     if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
@@ -584,7 +590,6 @@ KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* r
     sender_id_(sender_id),
     partition_type_(sink.output_partition.type),
     per_channel_buffer_size_(per_channel_buffer_size),
-    total_network_timer_(TUnit::TIME_NS, 0),
     dest_node_id_(sink.dest_node_id),
     next_unknown_partition_(0) {
   DCHECK_GT(destinations.size(), 0);
@@ -642,9 +647,7 @@ Status KrpcDataStreamSender::Prepare(
   bytes_sent_time_series_counter_ =
       ADD_TIME_SERIES_COUNTER(profile(), "BytesSent", bytes_sent_counter_);
   network_throughput_counter_ =
-      profile()->AddDerivedCounter("NetworkThroughput", TUnit::BYTES_PER_SECOND,
-          bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-              &total_network_timer_));
+      ADD_SUMMARY_STATS_COUNTER(profile(), "NetworkThroughput", TUnit::BYTES_PER_SECOND);
   eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/impala/blob/2a60655b/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index 6757c2a..65580b1 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -207,13 +207,10 @@ class KrpcDataStreamSender : public DataSink {
   /// Total number of rows sent.
   RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr;
 
-  /// Approximate network throughput for sending row batches.
-  RuntimeProfile::Counter* network_throughput_counter_ = nullptr;
-
-  /// Aggregated time spent in network (including queuing time in KRPC transfer queue)
-  /// for transmitting the RPC requests and receiving the responses. Used for computing
-  /// 'network_throughput_'. Not too meaningful by itself so not shown in profile.
-  RuntimeProfile::Counter total_network_timer_;
+  /// Summary of network throughput for sending row batches. Network time also includes
+  /// queuing time in KRPC transfer queue for transmitting the RPC requests and receiving
+  /// the responses.
+  RuntimeProfile::SummaryStatsCounter* network_throughput_counter_ = nullptr;
 
   /// Identifier of the destination plan node.
   PlanNodeId dest_node_id_;