You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/03/31 23:35:03 UTC

[02/12] impala git commit: IMPALA-6685: Improve profiles in KrpcDataStreamRecvr and KrpcDataStreamSender

IMPALA-6685: Improve profiles in KrpcDataStreamRecvr and KrpcDataStreamSender

This change implements a couple of improvements to the profiles of
KrpcDataStreamRecvr and KrpcDataStreamSender:

- track pending number of deferred row batches over time in KrpcDataStreamRecvr
- track the number of bytes dequeued over time in KrpcDataStreamRecvr
- track the total time deferred RPCs queues are not empty
- track the number of bytes sent from KrpcDataStreamSender over time
- track the total amount of time spent in KrpcDataStreamSender, including time
  spent waiting for RPC completion.

Sample profile of an Exchange node instance:

          EXCHANGE_NODE (id=21):(Total: 2s284ms, non-child: 64.926ms, % non-child: 2.84%)
             - ConvertRowBatchTime: 44.380ms
             - PeakMemoryUsage: 124.04 KB (127021)
             - RowsReturned: 287.51K (287514)
             - RowsReturnedRate: 125.88 K/sec
            Buffer pool:
               - AllocTime: 1.109ms
               - CumulativeAllocationBytes: 10.96 MB (11493376)
               - CumulativeAllocations: 562 (562)
               - PeakReservation: 112.00 KB (114688)
               - PeakUnpinnedBytes: 0
               - PeakUsedReservation: 112.00 KB (114688)
               - ReadIoBytes: 0
               - ReadIoOps: 0 (0)
               - ReadIoWaitTime: 0.000ns
               - WriteIoBytes: 0
               - WriteIoOps: 0 (0)
               - WriteIoWaitTime: 0.000ns
            Dequeue:
              BytesDequeued(500.000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700.00 KB, 2.00 MB, 3.49 MB, 4.39 MB, 5.86 MB, 6.85 MB
               - FirstBatchWaitTime: 0.000ns
               - TotalBytesDequeued: 6.85 MB (7187850)
               - TotalGetBatchTime: 2s237ms
                 - DataWaitTime: 2s219ms
            Enqueue:
              BytesReceived(500.000ms): 0, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 328.73 KB, 963.79 KB, 1.64 MB, 2.09 MB, 2.76 MB, 3.23 MB
              DeferredQueueSize(500.000ms): 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0
               - DispatchTime: (Avg: 108.593us ; Min: 30.525us ; Max: 1.524ms ; Number of samples: 281)
               - DeserializeRowBatchTime: 8.395ms
               - TotalBatchesEnqueued: 281 (281)
               - TotalBatchesReceived: 281 (281)
               - TotalBytesReceived: 3.23 MB (3387144)
               - TotalEarlySenders: 0 (0)
               - TotalEosReceived: 1 (1)
               - TotalHasDeferredRPCsTime: 15s446ms
               - TotalRPCsDeferred: 38 (38)

Sample sender's profile:

        KrpcDataStreamSender (dst_id=21):(Total: 17s923ms, non-child: 604.494ms, % non-child: 3.37%)
          BytesSent(500.000ms): 0, 0, 0, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 46.54 KB, 46.54 KB, 46.54 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 974.44 KB, 2.82 MB, 4.93 MB, 6.27 MB, 8.28 MB, 9.69 MB
           - EosSent: 3 (3)
           - NetworkThroughput: 4.61 MB/sec
           - PeakMemoryUsage: 22.57 KB (23112)
           - RowsSent: 287.51K (287514)
           - RpcFailure: 0 (0)
           - RpcRetry: 0 (0)
           - SerializeBatchTime: 329.162ms
           - TotalBytesSent: 9.69 MB (10161432)
           - UncompressedRowBatchSize: 20.56 MB (21563550)

Change-Id: I8ba405921b3df920c1e85b940ce9c8d02fc647cd
Reviewed-on: http://gerrit.cloudera.org:8080/9690
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3855cb0f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3855cb0f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3855cb0f

Branch: refs/heads/2.x
Commit: 3855cb0f59b16d9051dc44efb08df7bb9a337e66
Parents: fc0af7f
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Mar 15 19:42:10 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 28 23:59:02 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-test.cc        |   2 -
 be/src/runtime/krpc-data-stream-mgr.cc    |   2 +-
 be/src/runtime/krpc-data-stream-recvr.cc  | 206 +++++++++++++++++--------
 be/src/runtime/krpc-data-stream-recvr.h   |  91 +++++++----
 be/src/runtime/krpc-data-stream-sender.cc |  58 +++----
 be/src/runtime/krpc-data-stream-sender.h  |  16 +-
 be/src/runtime/runtime-state.h            |   6 +-
 be/src/service/data-stream-service.cc     |  21 ++-
 be/src/service/data-stream-service.h      |   5 +
 common/protobuf/data_stream_service.proto |   6 +
 10 files changed, 278 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index bd76b57..0a0d81e 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -746,7 +746,6 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
   StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
   JoinSenders();
   EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
-  EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
 TEST_P(DataStreamTest, UnknownSenderLargeResult) {
@@ -756,7 +755,6 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
   StartSender();
   JoinSenders();
   EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
-  EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
 TEST_P(DataStreamTest, Cancel) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 5a9305f..cd8d90b 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -255,7 +255,7 @@ void KrpcDataStreamMgr::DeserializeThreadFn(int thread_id, const DeserializeTask
     recvr = FindRecvr(task.finst_id, task.dest_node_id, &already_unregistered);
     DCHECK(recvr != nullptr || already_unregistered);
   }
-  if (recvr != nullptr) recvr->DequeueDeferredRpc(task.sender_id);
+  if (recvr != nullptr) recvr->ProcessDeferredRpc(task.sender_id);
 }
 
 void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index c7126d4..6e47bd6 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -25,6 +25,7 @@
 
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/util/monotime.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/krpc-data-stream-mgr.h"
@@ -35,6 +36,7 @@
 #include "util/runtime-profile-counters.h"
 #include "util/periodic-counter-updater.h"
 #include "util/test-info.h"
+#include "util/time.h"
 
 #include "gen-cpp/data_stream_service.pb.h"
 
@@ -43,6 +45,8 @@
 DECLARE_bool(use_krpc);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 
+using kudu::MonoDelta;
+using kudu::MonoTime;
 using kudu::rpc::RpcContext;
 using std::condition_variable_any;
 
@@ -82,7 +86,7 @@ class KrpcDataStreamRecvr::SenderQueue {
   // On success, the first entry of 'deferred_rpcs_' is removed and the sender of the RPC
   // will be responded to. If the serialized row batch fails to be extracted from the
   // entry, the error status will be sent as reply.
-  void DequeueDeferredRpc();
+  void ProcessDeferredRpc();
 
   // Takes over the RPC state 'ctx' of an early sender for deferred processing and
   // kicks off a deserialization task to process it asynchronously. The ownership of
@@ -110,12 +114,26 @@ class KrpcDataStreamRecvr::SenderQueue {
   // soft limit of the receiver to be exceeded. Expected to be called with lock_ held.
   bool CanEnqueue(int64_t batch_size) const;
 
+  // Helper function for inserting 'payload' into 'deferred_rpcs_'. Also does some
+  // accounting for various counters.
+  void EnqueueDeferredRpc(unique_ptr<TransmitDataCtx> payload);
+
+  // Helper function for removing the first item from 'deferred_rpcs_'. Also does some
+  // accounting for various counters.
+  void DequeueDeferredRpc();
+
   // Unpacks a serialized row batch from 'request' and 'rpc_context' and populates
-  // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch size will
-  // be stored in 'batch_size'. On failure, the error status is returned.
+  // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch sizes is
+  // stored in 'deserialized_size'. If 'serialized_size' is not NULL, also stores the
+  // serialized row batch size in it. On failure, the error status is returned.
   static Status UnpackRequest(const TransmitDataRequestPB* request,
       RpcContext* rpc_context, kudu::Slice* tuple_offsets, kudu::Slice* tuple_data,
-      int64_t* batch_size);
+      int64_t* deserialized_size, int64_t* serialized_size = nullptr);
+
+  // Helper function to compute the serialized row batch size from 'request'
+  // and 'rpc_context'. Returns 0 on failure to unpack the serialized row batch.
+  static int64_t GetSerializedBatchSize(const TransmitDataRequestPB* request,
+      RpcContext* rpc_context);
 
   // The workhorse function for deserializing a row batch represented by ('header',
   // 'tuple_offsets' and 'tuple_data') and inserting it into 'batch_queue'. Expects to be
@@ -183,6 +201,11 @@ class KrpcDataStreamRecvr::SenderQueue {
   // full when they last tried to do so. The senders wait here until there is a space for
   // their batches, allowing the receiver-side to implement basic flow-control.
   std::queue<std::unique_ptr<TransmitDataCtx>> deferred_rpcs_;
+
+  // Monotonic time in nanoseconds of when 'deferred_rpcs_' goes from being empty to
+  // non-empty. Set to 0 when 'deferred_rpcs_' becomes empty again. Used for computing
+  // 'total_has_deferred_rpcs_timer_'.
+  int64_t has_deferred_rpcs_start_time_ns_ = 0;
 };
 
 KrpcDataStreamRecvr::SenderQueue::SenderQueue(
@@ -190,7 +213,7 @@ KrpcDataStreamRecvr::SenderQueue::SenderQueue(
   : recvr_(parent_recvr), num_remaining_senders_(num_senders) { }
 
 Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
-  SCOPED_TIMER(recvr_->queue_get_batch_time_);
+  SCOPED_TIMER(recvr_->queue_get_batch_timer_);
   DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   DCHECK(!recvr_->closed_);
   int num_to_dequeue = 0;
@@ -204,10 +227,15 @@ Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
 
     // Wait until something shows up or we know we're done
     while (batch_queue_.empty() && !is_cancelled_ && num_remaining_senders_ > 0) {
+      // Verify before waiting on 'data_arrival_cv_' that if there are any deferred
+      // batches, either there is outstanding deserialization request queued or there
+      // is pending insertion so this thread is guaranteed to wake up at some point.
+      DCHECK(deferred_rpcs_.empty() ||
+          (num_deserialize_tasks_pending_ + num_pending_enqueue_) > 0);
       VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
                << " node=" << recvr_->dest_node_id();
       // Don't count time spent waiting on the sender as active time.
-      CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
+      CANCEL_SAFE_SCOPED_TIMER(recvr_->data_wait_timer_, &is_cancelled_);
       CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
       CANCEL_SAFE_SCOPED_TIMER(
           received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
@@ -248,7 +276,9 @@ Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
     DCHECK(!batch_queue_.empty());
     received_first_batch_ = true;
     RowBatch* result = batch_queue_.front().second.release();
-    recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
+    int64_t batch_size = batch_queue_.front().first;
+    COUNTER_ADD(recvr_->bytes_dequeued_counter_, batch_size);
+    recvr_->num_buffered_bytes_.Add(-batch_size);
     batch_queue_.pop_front();
     VLOG_ROW << "fetched #rows=" << result->num_rows();
     current_batch_.reset(result);
@@ -276,9 +306,29 @@ inline bool KrpcDataStreamRecvr::SenderQueue::CanEnqueue(int64_t batch_size) con
   return queue_empty || !recvr_->ExceedsLimit(batch_size);
 }
 
+void KrpcDataStreamRecvr::SenderQueue::EnqueueDeferredRpc(
+    unique_ptr<TransmitDataCtx> payload) {
+  if (deferred_rpcs_.empty()) has_deferred_rpcs_start_time_ns_ = MonotonicNanos();
+  deferred_rpcs_.push(move(payload));
+  recvr_->num_deferred_rpcs_.Add(1);
+  COUNTER_ADD(recvr_->total_deferred_rpcs_counter_, 1);
+}
+
+void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+  deferred_rpcs_.pop();
+  if (deferred_rpcs_.empty()) {
+    DCHECK_NE(has_deferred_rpcs_start_time_ns_, 0);
+    int64_t duration = MonotonicNanos() - has_deferred_rpcs_start_time_ns_;
+    COUNTER_ADD(recvr_->total_has_deferred_rpcs_timer_, duration);
+    has_deferred_rpcs_start_time_ns_ = 0;
+  }
+  recvr_->num_deferred_rpcs_.Add(-1);
+}
+
 Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
     const TransmitDataRequestPB* request, RpcContext* rpc_context,
-    kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* batch_size) {
+    kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* deserialized_size,
+    int64_t* serialized_size) {
   // Unpack the tuple offsets.
   KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
       request->tuple_offsets_sidecar_idx(), tuple_offsets),
@@ -288,8 +338,12 @@ Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
       request->tuple_data_sidecar_idx(), tuple_data),
       "Failed to get the tuple data sidecar");
   // Compute the size of the deserialized row batch.
-  *batch_size =
+  *deserialized_size =
       RowBatch::GetDeserializedSize(request->row_batch_header(), *tuple_offsets);
+  // Compute the size of the serialized row batch.
+  if (serialized_size != nullptr) {
+    *serialized_size = tuple_offsets->size() + tuple_data->size();
+  }
   return Status::OK();
 }
 
@@ -300,8 +354,6 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   DCHECK(lock->owns_lock());
   DCHECK(!is_cancelled_);
 
-  COUNTER_ADD(recvr_->num_received_batches_, 1);
-  COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
   // Reserve queue space before dropping the lock below.
   recvr_->num_buffered_bytes_.Add(batch_size);
   // Bump 'num_pending_enqueue_' to avoid race with Close() when lock is dropped below.
@@ -331,7 +383,7 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
     return status;
   }
   VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
-  COUNTER_ADD(recvr_->num_enqueued_batches_, 1);
+  COUNTER_ADD(recvr_->total_enqueued_batches_counter_, 1);
   batch_queue_.emplace_back(batch_size, move(batch));
   data_arrival_cv_.notify_one();
   return Status::OK();
@@ -344,14 +396,15 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
   kudu::Slice tuple_offsets;
   kudu::Slice tuple_data;
   int64_t batch_size;
-  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
       &batch_size);
   if (UNLIKELY(!status.ok())) {
-    status.ToProto(response->mutable_status());
-    rpc_context->RespondSuccess();
+    DataStreamService::RespondRpc(status, response, rpc_context);
     return;
   }
+  COUNTER_ADD(recvr_->total_received_batches_counter_, 1);
+  // To be consistent with the senders, only count the sidecars size.
+  COUNTER_ADD(recvr_->bytes_received_counter_, tuple_data.size() + tuple_offsets.size());
 
   {
     unique_lock<SpinLock> l(lock_);
@@ -361,8 +414,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
     // responded to if we reach here.
     DCHECK_GT(num_remaining_senders_, 0);
     if (UNLIKELY(is_cancelled_)) {
-      Status::OK().ToProto(response->mutable_status());
-      rpc_context->RespondSuccess();
+      DataStreamService::RespondRpc(Status::OK(), response, rpc_context);
       return;
     }
 
@@ -375,8 +427,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
     if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
       recvr_->deferred_rpc_tracker()->Consume(rpc_context->GetTransferSize());
       auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
-      deferred_rpcs_.push(move(payload));
-      COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+      EnqueueDeferredRpc(move(payload));
       return;
     }
 
@@ -385,11 +436,10 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
   }
 
   // Respond to the sender to ack the insertion of the row batches.
-  status.ToProto(response->mutable_status());
-  rpc_context->RespondSuccess();
+  DataStreamService::RespondRpc(status, response, rpc_context);
 }
 
-void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+void KrpcDataStreamRecvr::SenderQueue::ProcessDeferredRpc() {
   // Owns the first entry of 'deferred_rpcs_' if it ends up being popped.
   std::unique_ptr<TransmitDataCtx> ctx;
   Status status;
@@ -413,7 +463,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     if (UNLIKELY(!status.ok())) {
       DataStreamService::RespondAndReleaseRpc(status, ctx->response, ctx->rpc_context,
           recvr_->deferred_rpc_tracker());
-      deferred_rpcs_.pop();
+      DequeueDeferredRpc();
       return;
     }
 
@@ -426,7 +476,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     }
 
     // Dequeues the deferred batch and adds it to 'batch_queue_'.
-    deferred_rpcs_.pop();
+    DequeueDeferredRpc();
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
     status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
     DCHECK(!status.ok() || !batch_queue_.empty());
@@ -437,8 +487,20 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
 
   // Responds to the sender to ack the insertion of the row batches.
   // No need to hold lock when enqueuing the response.
-  status.ToProto(ctx->response->mutable_status());
-  ctx->rpc_context->RespondSuccess();
+  DataStreamService::RespondRpc(status, ctx->response, ctx->rpc_context);
+}
+
+int64_t KrpcDataStreamRecvr::SenderQueue::GetSerializedBatchSize(
+    const TransmitDataRequestPB* request, RpcContext* rpc_context) {
+  kudu::Slice tuple_offsets;
+  kudu::Slice tuple_data;
+  int64_t unused;
+  int64_t serialized_size = 0;
+  if (UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data, &unused,
+          &serialized_size).ok()) {
+    return serialized_size;
+  }
+  return 0;
 }
 
 void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
@@ -448,21 +510,21 @@ void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
   // 'recvr_->mgr_' shouldn't be NULL.
   DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   DCHECK(!recvr_->closed_ && recvr_->mgr_ != nullptr);
+  COUNTER_ADD(recvr_->total_received_batches_counter_, 1);
+  COUNTER_ADD(recvr_->bytes_received_counter_,
+      GetSerializedBatchSize(ctx->request, ctx->rpc_context));
   int sender_id = ctx->request->sender_id();
-  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
     if (UNLIKELY(is_cancelled_)) {
-      Status::OK().ToProto(ctx->response->mutable_status());
-      ctx->rpc_context->RespondSuccess();
+      DataStreamService::RespondRpc(Status::OK(), ctx->response, ctx->rpc_context);
       return;
     }
     // Only enqueue a deferred RPC if the sender queue is not yet cancelled.
     recvr_->deferred_rpc_tracker()->Consume(ctx->rpc_context->GetTransferSize());
-    deferred_rpcs_.push(move(ctx));
+    EnqueueDeferredRpc(move(ctx));
     ++num_deserialize_tasks_pending_;
   }
-  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
   recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
       recvr_->dest_node_id(), sender_id, 1);
 }
@@ -489,7 +551,7 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
       const unique_ptr<TransmitDataCtx>& ctx = deferred_rpcs_.front();
       DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response,
           ctx->rpc_context, recvr_->deferred_rpc_tracker());
-      deferred_rpcs_.pop();
+      DequeueDeferredRpc();
     }
   }
   VLOG_QUERY << "cancelled stream: fragment_instance_id="
@@ -565,8 +627,8 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     parent_tracker_(parent_tracker),
     buffer_pool_client_(client),
     profile_(profile),
-    recvr_side_profile_(RuntimeProfile::Create(&pool_, "RecvrSide")),
-    sender_side_profile_(RuntimeProfile::Create(&pool_, "SenderSide")) {
+    dequeue_profile_(RuntimeProfile::Create(&pool_, "Dequeue")),
+    enqueue_profile_(RuntimeProfile::Create(&pool_, "Enqueue")) {
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);
@@ -576,36 +638,47 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     sender_queues_.push_back(queue);
   }
 
-  // Add the receiver and sender sides' profiles as children of the owning exchange
-  // node's profile.
-  profile_->AddChild(recvr_side_profile_);
-  profile_->AddChild(sender_side_profile_);
+  // Add the profiles of the dequeuing side (i.e. GetBatch()) and the enqueuing side
+  // (i.e. AddBatchWork()) as children of the owning exchange node's profile.
+  profile_->AddChild(dequeue_profile_);
+  profile_->AddChild(enqueue_profile_);
+
+  // Initialize various counters for measuring dequeuing from queues.
+  bytes_dequeued_counter_ =
+      ADD_COUNTER(dequeue_profile_, "TotalBytesDequeued", TUnit::BYTES);
+  bytes_dequeued_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+      dequeue_profile_, "BytesDequeued", bytes_dequeued_counter_);
+  queue_get_batch_timer_ = ADD_TIMER(dequeue_profile_, "TotalGetBatchTime");
+  data_wait_timer_ =
+      ADD_CHILD_TIMER(dequeue_profile_, "DataWaitTime", "TotalGetBatchTime");
+  inactive_timer_ = profile_->inactive_timer();
+  first_batch_wait_total_timer_ =
+      ADD_TIMER(dequeue_profile_, "FirstBatchWaitTime");
 
-  // Initialize the counters
+  // Initialize various counters for measuring enqueuing into queues.
   bytes_received_counter_ =
-      ADD_COUNTER(recvr_side_profile_, "TotalBytesReceived", TUnit::BYTES);
+      ADD_COUNTER(enqueue_profile_, "TotalBytesReceived", TUnit::BYTES);
   bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
-      recvr_side_profile_, "BytesReceived", bytes_received_counter_);
-  queue_get_batch_time_ = ADD_TIMER(recvr_side_profile_, "TotalGetBatchTime");
-  data_arrival_timer_ =
-      ADD_CHILD_TIMER(recvr_side_profile_, "DataArrivalTimer", "TotalGetBatchTime");
-  inactive_timer_ = profile_->inactive_timer();
-  first_batch_wait_total_timer_ =
-      ADD_TIMER(recvr_side_profile_, "FirstBatchArrivalWaitTime");
+      enqueue_profile_, "BytesReceived", bytes_received_counter_);
   deserialize_row_batch_timer_ =
-      ADD_TIMER(sender_side_profile_, "DeserializeRowBatchTime");
-  num_early_senders_ =
-      ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
-  num_arrived_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesArrived", TUnit::UNIT);
-  num_received_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesReceived", TUnit::UNIT);
-  num_enqueued_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesEnqueued", TUnit::UNIT);
-  num_deferred_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
-  num_eos_received_ =
-      ADD_COUNTER(sender_side_profile_, "NumEosReceived", TUnit::UNIT);
+      ADD_TIMER(enqueue_profile_, "DeserializeRowBatchTime");
+  total_eos_received_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalEosReceived", TUnit::UNIT);
+  total_early_senders_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalEarlySenders", TUnit::UNIT);
+  total_received_batches_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalBatchesReceived", TUnit::UNIT);
+  total_enqueued_batches_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalBatchesEnqueued", TUnit::UNIT);
+  total_deferred_rpcs_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
+  deferred_rpcs_time_series_counter_ =
+      enqueue_profile_->AddTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
+      bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
+  total_has_deferred_rpcs_timer_ =
+      ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
+  dispatch_timer_ =
+      ADD_SUMMARY_STATS_TIMER(enqueue_profile_, "DispatchTime");
 }
 
 Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -616,28 +689,30 @@ Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
 
 void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
+  MonoDelta duration(MonoTime::Now().GetDeltaSince(rpc_context->GetTimeReceived()));
+  dispatch_timer_->UpdateCounter(duration.ToNanoseconds());
   int use_sender_id = is_merging_ ? request->sender_id() : 0;
   // Add all batches to the same queue if is_merging_ is false.
   sender_queues_[use_sender_id]->AddBatch(request, response, rpc_context);
 }
 
-void KrpcDataStreamRecvr::DequeueDeferredRpc(int sender_id) {
+void KrpcDataStreamRecvr::ProcessDeferredRpc(int sender_id) {
   int use_sender_id = is_merging_ ? sender_id : 0;
   // Add all batches to the same queue if is_merging_ is false.
-  sender_queues_[use_sender_id]->DequeueDeferredRpc();
+  sender_queues_[use_sender_id]->ProcessDeferredRpc();
 }
 
 void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
   int use_sender_id = is_merging_ ? ctx->request->sender_id() : 0;
   // Add all batches to the same queue if is_merging_ is false.
   sender_queues_[use_sender_id]->TakeOverEarlySender(move(ctx));
-  COUNTER_ADD(num_early_senders_, 1);
+  COUNTER_ADD(total_early_senders_counter_, 1);
 }
 
 void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
   int use_sender_id = is_merging_ ? sender_id : 0;
   sender_queues_[use_sender_id]->DecrementSenders();
-  COUNTER_ADD(num_eos_received_, 1);
+  COUNTER_ADD(total_eos_received_counter_, 1);
 }
 
 void KrpcDataStreamRecvr::CancelStream() {
@@ -660,7 +735,8 @@ void KrpcDataStreamRecvr::Close() {
   // Given all queues have been cancelled and closed already at this point, it's safe to
   // call Close() on 'deferred_rpc_tracker_' without holding any lock here.
   deferred_rpc_tracker_->Close();
-  recvr_side_profile_->StopPeriodicCounters();
+  dequeue_profile_->StopPeriodicCounters();
+  enqueue_profile_->StopPeriodicCounters();
 
   // Remove reference to the unowned resources which may be freed after Close().
   mgr_ = nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 845dbf5..1435e44 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -145,7 +145,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// identified by 'sender_id'. If is_merging_ is false, it always defaults to
   /// queue 0; If is_merging_ is true, the sender queue is identified by 'sender_id_'.
   /// Called from KrpcDataStreamMgr's deserialization threads only.
-  void DequeueDeferredRpc(int sender_id);
+  void ProcessDeferredRpc(int sender_id);
 
   /// Takes over the RPC state 'ctx' of an early sender for deferred processing and
   /// kicks off a deserialization task to process it asynchronously. This makes sure
@@ -167,6 +167,9 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
     return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
   }
 
+  /// Return the current number of deferred RPCs.
+  int64_t num_deferred_rpcs() const { return num_deferred_rpcs_.Load(); }
+
   /// KrpcDataStreamMgr instance used to create this recvr. Not owned.
   KrpcDataStreamMgr* mgr_;
 
@@ -190,9 +193,12 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// from the fragment execution thread.
   bool closed_;
 
-  /// total number of bytes held across all sender queues.
+  /// Current number of bytes held across all sender queues.
   AtomicInt32 num_buffered_bytes_;
 
+  /// Current number of outstanding deferred RPCs across all sender queues.
+  AtomicInt64 num_deferred_rpcs_;
+
   /// Memtracker for payloads of deferred Rpcs in the sender queue(s). This must be
   /// accessed with a sender queue's lock held to avoid race with Close() of the queue.
   boost::scoped_ptr<MemTracker> deferred_rpc_tracker_;
@@ -217,60 +223,79 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   ObjectPool pool_;
 
   /// Runtime profile of the owning exchange node. It's the parent of
-  /// 'recvr_side_profile_' and 'sender_side_profile_'. Not owned.
+  /// 'dequeue_profile_' and 'enqueue_profile_'. Not owned.
   RuntimeProfile* profile_;
 
   /// Maintain two child profiles - receiver side measurements (from the GetBatch() path),
   /// and sender side measurements (from AddBatch()). These two profiles own all counters
   /// below unless otherwise noted. These profiles are owned by the receiver and placed
-  /// in 'pool_'. 'recvr_side_profile_' and 'sender_side_profile_' must outlive 'profile_'
+  /// in 'pool_'. 'dequeue_profile_' and 'enqueue_profile_' must outlive 'profile_'
   /// to prevent accessing freed memory during top-down traversal of 'profile_'. The
   /// receiver is co-owned by the exchange node and the data stream manager so these two
   /// profiles should outlive the exchange node which owns 'profile_'.
-  RuntimeProfile* recvr_side_profile_;
-  RuntimeProfile* sender_side_profile_;
+  RuntimeProfile* dequeue_profile_;
+  RuntimeProfile* enqueue_profile_;
 
-  /// Number of bytes received but not necessarily enqueued.
-  RuntimeProfile::Counter* bytes_received_counter_;
+  /// Pointer to profile's inactive timer. Not owned.
+  /// Not directly shown in the profile and thus data_wait_time_ below. Used for
+  /// subtracting the wait time from the total time spent in exchange node.
+  RuntimeProfile::Counter* inactive_timer_;
 
-  /// Time series of number of bytes received, samples bytes_received_counter_.
-  RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
+  /// ------------------------------------------------------------------------------------
+  /// Following counters belong to 'dequeue_profile_'.
 
-  /// Total wall-clock time spent deserializing row batches.
-  RuntimeProfile::Counter* deserialize_row_batch_timer_;
+  /// Number of bytes of deserialized row batches dequeued.
+  RuntimeProfile::Counter* bytes_dequeued_counter_;
+
+  /// Time series of bytes of deserialized row batches, samples 'bytes_dequeued_counter_'.
+  RuntimeProfile::TimeSeriesCounter* bytes_dequeued_time_series_counter_;
+
+  /// Total wall-clock time spent in SenderQueue::GetBatch().
+  RuntimeProfile::Counter* queue_get_batch_timer_;
 
-  /// Number of senders which arrive before the receiver is ready.
-  RuntimeProfile::Counter* num_early_senders_;
+  /// Total wall-clock time spent waiting for data to be available in queues.
+  RuntimeProfile::Counter* data_wait_timer_;
 
-  /// Time spent waiting until the first batch arrives across all queues.
-  /// TODO: Turn this into a wall-clock timer.
+  /// Wall-clock time spent waiting for the first batch arrival across all queues.
   RuntimeProfile::Counter* first_batch_wait_total_timer_;
 
-  /// Total number of batches which arrived at this receiver but not necessarily received
-  /// or enqueued. An arrived row batch will eventually be received if there is no error
-  /// unpacking the RPC payload and the receiving stream is not cancelled.
-  RuntimeProfile::Counter* num_arrived_batches_;
+  /// ------------------------------------------------------------------------------------
+  /// Following counters belong to 'enqueue_profile_'.
 
-  /// Total number of batches received but not necessarily enqueued.
-  RuntimeProfile::Counter* num_received_batches_;
+  /// Total number of bytes of serialized row batches received.
+  RuntimeProfile::Counter* bytes_received_counter_;
 
-  /// Total number of batches received and enqueued into the row batch queue.
-  RuntimeProfile::Counter* num_enqueued_batches_;
+  /// Time series of number of bytes received, samples 'bytes_received_counter_'.
+  RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
 
-  /// Total number of batches deferred because of early senders or full row batch queue.
-  RuntimeProfile::Counter* num_deferred_batches_;
+  /// Total wall-clock time spent deserializing row batches.
+  RuntimeProfile::Counter* deserialize_row_batch_timer_;
 
   /// Total number of EOS received.
-  RuntimeProfile::Counter* num_eos_received_;
+  RuntimeProfile::Counter* total_eos_received_counter_;
 
-  /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
-  RuntimeProfile::Counter* data_arrival_timer_;
+  /// Total number of senders which arrive before the receiver is ready.
+  RuntimeProfile::Counter* total_early_senders_counter_;
 
-  /// Pointer to profile's inactive timer. Not owned.
-  RuntimeProfile::Counter* inactive_timer_;
+  /// Total number of serialized row batches received.
+  RuntimeProfile::Counter* total_received_batches_counter_;
+
+  /// Total number of deserialized row batches enqueued into the row batch queues.
+  RuntimeProfile::Counter* total_enqueued_batches_counter_;
+
+  /// Total number of RPCs whose responses are deferred because of early senders or
+  /// full row batch queue.
+  RuntimeProfile::Counter* total_deferred_rpcs_counter_;
+
+  /// Time series of number of deferred row batches, samples 'num_deferred_rpcs_'.
+  RuntimeProfile::TimeSeriesCounter* deferred_rpcs_time_series_counter_;
+
+  /// Total wall-clock time in which the 'deferred_rpcs_' queues are not empty.
+  RuntimeProfile::Counter* total_has_deferred_rpcs_timer_;
 
-  /// Total time spent in SenderQueue::GetBatch().
-  RuntimeProfile::Counter* queue_get_batch_time_;
+  /// Summary stats of time which RPCs spent in KRPC service queue before
+  /// being dispatched to the RPC handlers.
+  RuntimeProfile::SummaryStatsCounter* dispatch_timer_;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index bc6a07d..0f11dec 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -142,8 +142,6 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // This function blocks until the EOS RPC is complete.
   Status FlushAndSendEos(RuntimeState* state);
 
-  int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
-
   // The type for a RPC worker function.
   typedef boost::function<Status()> DoRpcFn;
 
@@ -160,9 +158,6 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   const TUniqueId fragment_instance_id_;
   const PlanNodeId dest_node_id_;
 
-  // Number of bytes of all serialized row batches sent successfully.
-  int64_t num_data_bytes_sent_ = 0;
-
   // The row batch for accumulating rows copied from AddRow().
   // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
   scoped_ptr<RowBatch> batch_;
@@ -211,6 +206,9 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // The pointer to the current serialized row batch being sent.
   const OutboundRowBatch* rpc_in_flight_batch_ = nullptr;
 
+  // The monotonic time in nanoseconds of when current RPC started.
+  int64_t rpc_start_time_ns_ = 0;
+
   // True if there is an in-flight RPC.
   bool rpc_in_flight_ = false;
 
@@ -311,12 +309,14 @@ void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
   rpc_in_flight_ = false;
   rpc_in_flight_batch_ = nullptr;
   rpc_done_cv_.notify_one();
+  rpc_start_time_ns_ = 0;
 }
 
 Status KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* lock) {
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
 
+  SCOPED_TIMER(parent_->profile()->inactive_timer());
   SCOPED_TIMER(parent_->state_->total_network_send_timer());
 
   // Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled.
@@ -379,17 +379,20 @@ void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
 }
 
 void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
+  DCHECK_NE(rpc_start_time_ns_, 0);
+  int64_t total_time = MonotonicNanos() - rpc_start_time_ns_;
   std::unique_lock<SpinLock> l(lock_);
   DCHECK(rpc_in_flight_);
   const kudu::Status controller_status = rpc_controller_.status();
   if (LIKELY(controller_status.ok())) {
+    DCHECK(rpc_in_flight_batch_ != nullptr);
+    COUNTER_ADD(parent_->bytes_sent_counter_,
+        RowBatch::GetSerializedSize(*rpc_in_flight_batch_));
+    int64_t network_time = total_time - resp_.receiver_latency_ns();
+    COUNTER_ADD(&parent_->total_network_timer_, network_time);
     Status rpc_status = Status::OK();
     int32_t status_code = resp_.status().status_code();
-    if (LIKELY(status_code == TErrorCode::OK)) {
-      DCHECK(rpc_in_flight_batch_ != nullptr);
-      num_data_bytes_sent_ += RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
-      VLOG_ROW << "incremented #data_bytes_sent=" << num_data_bytes_sent_;
-    } else if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
+    if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
       remote_recvr_closed_ = true;
     } else {
       rpc_status = Status(resp_.status());
@@ -429,6 +432,7 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
   req.set_tuple_offsets_sidecar_idx(sidecar_idx);
 
   // Add 'tuple_data_' as sidecar.
+  rpc_start_time_ns_ = MonotonicNanos();
   KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(
       RpcSidecar::FromSlice(rpc_in_flight_batch_->TupleDataAsSlice()), &sidecar_idx),
       "Unable to add tuple data to sidecar");
@@ -572,6 +576,7 @@ KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* r
     sender_id_(sender_id),
     partition_type_(sink.output_partition.type),
     per_channel_buffer_size_(per_channel_buffer_size),
+    total_network_timer_(TUnit::TIME_NS, 0),
     dest_node_id_(sink.dest_node_id),
     next_unknown_partition_(0) {
   DCHECK_GT(destinations.size(), 0);
@@ -605,6 +610,7 @@ KrpcDataStreamSender::~KrpcDataStreamSender() {
 
 Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
     const TDataSink& tsink, RuntimeState* state) {
+  SCOPED_TIMER(profile_->total_time_counter());
   DCHECK(tsink.__isset.stream_sink);
   if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
       partition_type_ == TPartitionType::KUDU) {
@@ -625,15 +631,17 @@ Status KrpcDataStreamSender::Prepare(
   serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
   rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
   rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
-  bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+  bytes_sent_counter_ = ADD_COUNTER(profile(), "TotalBytesSent", TUnit::BYTES);
+  bytes_sent_time_series_counter_ =
+      ADD_TIME_SERIES_COUNTER(profile(), "BytesSent", bytes_sent_counter_);
+  network_throughput_counter_ =
+      profile()->AddDerivedCounter("NetworkThroughput", TUnit::BYTES_PER_SECOND,
+          bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
+              &total_network_timer_));
   eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
-  total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);
-  overall_throughput_ =
-      profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
-           bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-                         profile()->total_time_counter()));
+  total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsSent", TUnit::UNIT);
   for (int i = 0; i < channels_.size(); ++i) {
     RETURN_IF_ERROR(channels_[i]->Init(state));
   }
@@ -641,17 +649,19 @@ Status KrpcDataStreamSender::Prepare(
 }
 
 Status KrpcDataStreamSender::Open(RuntimeState* state) {
+  SCOPED_TIMER(profile_->total_time_counter());
   return ScalarExprEvaluator::Open(partition_expr_evals_, state);
 }
 
 Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
   DCHECK(!closed_);
   DCHECK(!flushed_);
 
   if (batch->num_rows() == 0) return Status::OK();
   if (partition_type_ == TPartitionType::UNPARTITIONED) {
     OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
-    RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch));
+    RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch, channels_.size()));
     // TransmitData() will block if there are still in-flight rpcs (and those will
     // reference the previously written serialized batch).
     for (int i = 0; i < channels_.size(); ++i) {
@@ -736,6 +746,7 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
 }
 
 Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
   DCHECK(!flushed_);
   DCHECK(!closed_);
   flushed_ = true;
@@ -749,12 +760,14 @@ Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
 }
 
 void KrpcDataStreamSender::Close(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
   if (closed_) return;
   for (int i = 0; i < channels_.size(); ++i) {
     channels_[i]->Teardown(state);
   }
   ScalarExprEvaluator::Close(partition_expr_evals_, state);
   ScalarExpr::Close(partition_exprs_);
+  profile()->StopPeriodicCounters();
   DataSink::Close(state);
 }
 
@@ -762,25 +775,16 @@ Status KrpcDataStreamSender::SerializeBatch(
     RowBatch* src, OutboundRowBatch* dest, int num_receivers) {
   VLOG_ROW << "serializing " << src->num_rows() << " rows";
   {
-    SCOPED_TIMER(profile_->total_time_counter());
     SCOPED_TIMER(serialize_batch_timer_);
     RETURN_IF_ERROR(src->Serialize(dest));
-    int64_t bytes = RowBatch::GetSerializedSize(*dest);
     int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
-    COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
     COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
   }
   return Status::OK();
 }
 
 int64_t KrpcDataStreamSender::GetNumDataBytesSent() const {
-  // TODO: do we need synchronization here or are reads & writes to 8-byte ints
-  // atomic?
-  int64_t result = 0;
-  for (int i = 0; i < channels_.size(); ++i) {
-    result += channels_[i]->num_data_bytes_sent();
-  }
-  return result;
+  return bytes_sent_counter_->value();
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index bc18574..e6c6ccf 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -157,20 +157,28 @@ class KrpcDataStreamSender : public DataSink {
   /// Total number of times RPC fails or the remote responds with a non-retryable error.
   RuntimeProfile::Counter* rpc_failure_counter_ = nullptr;
 
-  /// Total number of bytes sent.
+  /// Total number of bytes sent. Updated on RPC completion.
   RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
 
+  /// Time series of number of bytes sent, samples bytes_sent_counter_.
+  RuntimeProfile::TimeSeriesCounter* bytes_sent_time_series_counter_ = nullptr;
+
   /// Total number of EOS sent.
   RuntimeProfile::Counter* eos_sent_counter_ = nullptr;
 
-  /// Total number of bytes of the row batches before compression.
+  /// Total number of bytes of row batches before compression.
   RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;
 
   /// Total number of rows sent.
   RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr;
 
-  /// Throughput per total time spent in sender
-  RuntimeProfile::Counter* overall_throughput_ = nullptr;
+  /// Approximate network throughput for sending row batches.
+  RuntimeProfile::Counter* network_throughput_counter_ = nullptr;
+
+  /// Aggregated time spent in network (including queuing time in KRPC transfer queue)
+  /// for transmitting the RPC requests and receiving the responses. Used for computing
+  /// 'network_throughput_'. Not too meaningful by itself so not shown in profile.
+  RuntimeProfile::Counter total_network_timer_;
 
   /// Identifier of the destination plan node.
   PlanNodeId dest_node_id_;

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index b292789..7edd718 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -356,7 +356,11 @@ class RuntimeState {
   /// Total time waiting in storage (across all threads)
   RuntimeProfile::Counter* total_storage_wait_timer_;
 
-  /// Total time spent sending over the network (across all threads)
+  /// Total time spent waiting for RPCs to complete. This time is a combination of:
+  /// - network time of sending the RPC payload to the destination
+  /// - processing and queuing time in the destination
+  /// - network time of sending the RPC response to the originating node
+  /// TODO: rename this counter and account for the 3 components above. IMPALA-6705.
   RuntimeProfile::Counter* total_network_send_timer_;
 
   /// Total time spent receiving over the network (across all threads)

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index d94837b..1d42a99 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -23,6 +23,7 @@
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/util/monotime.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
@@ -36,6 +37,8 @@
 #include "common/names.h"
 
 using kudu::rpc::RpcContext;
+using kudu::MonoDelta;
+using kudu::MonoTime;
 
 static const string queue_limit_msg = "(Advanced) Limit on RPC payloads consumption for "
     "DataStreamService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
@@ -82,13 +85,27 @@ void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
 }
 
 template<typename ResponsePBType>
+void DataStreamService::RespondRpc(const Status& status,
+    ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
+  MonoDelta duration(MonoTime::Now().GetDeltaSince(ctx->GetTimeReceived()));
+  status.ToProto(response->mutable_status());
+  response->set_receiver_latency_ns(duration.ToNanoseconds());
+  ctx->RespondSuccess();
+}
+
+template<typename ResponsePBType>
 void DataStreamService::RespondAndReleaseRpc(const Status& status,
     ResponsePBType* response, kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker) {
   mem_tracker->Release(ctx->GetTransferSize());
-  status.ToProto(response->mutable_status());
-  ctx->RespondSuccess();
+  RespondRpc(status, response, ctx);
 }
 
+template void DataStreamService::RespondRpc(const Status& status,
+    TransmitDataResponsePB* response, kudu::rpc::RpcContext* ctx);
+
+template void DataStreamService::RespondRpc(const Status& status,
+    EndDataStreamResponsePB* response, kudu::rpc::RpcContext* ctx);
+
 template void DataStreamService::RespondAndReleaseRpc(const Status& status,
     TransmitDataResponsePB* response, kudu::rpc::RpcContext* ctx,
     MemTracker* mem_tracker);

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 812fb2c..e233165 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -62,6 +62,11 @@ class DataStreamService : public DataStreamServiceIf {
   static void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
       kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker);
 
+  /// Respond to a RPC passed in 'response'/'ctx' with 'status'. Takes ownership of 'ctx'.
+  template<typename ResponsePBType>
+  static void RespondRpc(const Status& status, ResponsePBType* response,
+      kudu::rpc::RpcContext* ctx);
+
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
  private:

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index c2045d2..854eb87 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -49,6 +49,9 @@ message TransmitDataRequestPB {
 message TransmitDataResponsePB {
   // Status::OK() on success; Error status on failure.
   optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
 }
 
 // All fields are required in V1.
@@ -66,6 +69,9 @@ message EndDataStreamRequestPB {
 // All fields are required in V1.
 message EndDataStreamResponsePB {
   optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
 }
 
 // Handles data transmission between fragment instances.