You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/03/31 23:35:03 UTC
[02/12] impala git commit: IMPALA-6685: Improve profiles in
KrpcDataStreamRecvr and KrpcDataStreamSender
IMPALA-6685: Improve profiles in KrpcDataStreamRecvr and KrpcDataStreamSender
This change implements a couple of improvements to the profiles of
KrpcDataStreamRecvr and KrpcDataStreamSender:
- track pending number of deferred row batches over time in KrpcDataStreamRecvr
- track the number of bytes dequeued over time in KrpcDataStreamRecvr
- track the total time deferred RPCs queues are not empty
- track the number of bytes sent from KrpcDataStreamSender over time
- track the total amount of time spent in KrpcDataStreamSender, including time
spent waiting for RPC completion.
Sample profile of an Exchange node instance:
EXCHANGE_NODE (id=21):(Total: 2s284ms, non-child: 64.926ms, % non-child: 2.84%)
- ConvertRowBatchTime: 44.380ms
- PeakMemoryUsage: 124.04 KB (127021)
- RowsReturned: 287.51K (287514)
- RowsReturnedRate: 125.88 K/sec
Buffer pool:
- AllocTime: 1.109ms
- CumulativeAllocationBytes: 10.96 MB (11493376)
- CumulativeAllocations: 562 (562)
- PeakReservation: 112.00 KB (114688)
- PeakUnpinnedBytes: 0
- PeakUsedReservation: 112.00 KB (114688)
- ReadIoBytes: 0
- ReadIoOps: 0 (0)
- ReadIoWaitTime: 0.000ns
- WriteIoBytes: 0
- WriteIoOps: 0 (0)
- WriteIoWaitTime: 0.000ns
Dequeue:
BytesDequeued(500.000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700.00 KB, 2.00 MB, 3.49 MB, 4.39 MB, 5.86 MB, 6.85 MB
- FirstBatchWaitTime: 0.000ns
- TotalBytesDequeued: 6.85 MB (7187850)
- TotalGetBatchTime: 2s237ms
- DataWaitTime: 2s219ms
Enqueue:
BytesReceived(500.000ms): 0, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 328.73 KB, 963.79 KB, 1.64 MB, 2.09 MB, 2.76 MB, 3.23 MB
DeferredQueueSize(500.000ms): 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0
- DispatchTime: (Avg: 108.593us ; Min: 30.525us ; Max: 1.524ms ; Number of samples: 281)
- DeserializeRowBatchTime: 8.395ms
- TotalBatchesEnqueued: 281 (281)
- TotalBatchesReceived: 281 (281)
- TotalBytesReceived: 3.23 MB (3387144)
- TotalEarlySenders: 0 (0)
- TotalEosReceived: 1 (1)
- TotalHasDeferredRPCsTime: 15s446ms
- TotalRPCsDeferred: 38 (38)
Sample sender's profile:
KrpcDataStreamSender (dst_id=21):(Total: 17s923ms, non-child: 604.494ms, % non-child: 3.37%)
BytesSent(500.000ms): 0, 0, 0, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 46.54 KB, 46.54 KB, 46.54 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 974.44 KB, 2.82 MB, 4.93 MB, 6.27 MB, 8.28 MB, 9.69 MB
- EosSent: 3 (3)
- NetworkThroughput: 4.61 MB/sec
- PeakMemoryUsage: 22.57 KB (23112)
- RowsSent: 287.51K (287514)
- RpcFailure: 0 (0)
- RpcRetry: 0 (0)
- SerializeBatchTime: 329.162ms
- TotalBytesSent: 9.69 MB (10161432)
- UncompressedRowBatchSize: 20.56 MB (21563550)
Change-Id: I8ba405921b3df920c1e85b940ce9c8d02fc647cd
Reviewed-on: http://gerrit.cloudera.org:8080/9690
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3855cb0f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3855cb0f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3855cb0f
Branch: refs/heads/2.x
Commit: 3855cb0f59b16d9051dc44efb08df7bb9a337e66
Parents: fc0af7f
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Mar 15 19:42:10 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 28 23:59:02 2018 +0000
----------------------------------------------------------------------
be/src/runtime/data-stream-test.cc | 2 -
be/src/runtime/krpc-data-stream-mgr.cc | 2 +-
be/src/runtime/krpc-data-stream-recvr.cc | 206 +++++++++++++++++--------
be/src/runtime/krpc-data-stream-recvr.h | 91 +++++++----
be/src/runtime/krpc-data-stream-sender.cc | 58 +++----
be/src/runtime/krpc-data-stream-sender.h | 16 +-
be/src/runtime/runtime-state.h | 6 +-
be/src/service/data-stream-service.cc | 21 ++-
be/src/service/data-stream-service.h | 5 +
common/protobuf/data_stream_service.proto | 6 +
10 files changed, 278 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index bd76b57..0a0d81e 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -746,7 +746,6 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
JoinSenders();
EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
- EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
}
TEST_P(DataStreamTest, UnknownSenderLargeResult) {
@@ -756,7 +755,6 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
StartSender();
JoinSenders();
EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
- EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
}
TEST_P(DataStreamTest, Cancel) {
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 5a9305f..cd8d90b 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -255,7 +255,7 @@ void KrpcDataStreamMgr::DeserializeThreadFn(int thread_id, const DeserializeTask
recvr = FindRecvr(task.finst_id, task.dest_node_id, &already_unregistered);
DCHECK(recvr != nullptr || already_unregistered);
}
- if (recvr != nullptr) recvr->DequeueDeferredRpc(task.sender_id);
+ if (recvr != nullptr) recvr->ProcessDeferredRpc(task.sender_id);
}
void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index c7126d4..6e47bd6 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -25,6 +25,7 @@
#include "exec/kudu-util.h"
#include "kudu/rpc/rpc_context.h"
+#include "kudu/util/monotime.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/krpc-data-stream-recvr.h"
#include "runtime/krpc-data-stream-mgr.h"
@@ -35,6 +36,7 @@
#include "util/runtime-profile-counters.h"
#include "util/periodic-counter-updater.h"
#include "util/test-info.h"
+#include "util/time.h"
#include "gen-cpp/data_stream_service.pb.h"
@@ -43,6 +45,8 @@
DECLARE_bool(use_krpc);
DECLARE_int32(datastream_service_num_deserialization_threads);
+using kudu::MonoDelta;
+using kudu::MonoTime;
using kudu::rpc::RpcContext;
using std::condition_variable_any;
@@ -82,7 +86,7 @@ class KrpcDataStreamRecvr::SenderQueue {
// On success, the first entry of 'deferred_rpcs_' is removed and the sender of the RPC
// will be responded to. If the serialized row batch fails to be extracted from the
// entry, the error status will be sent as reply.
- void DequeueDeferredRpc();
+ void ProcessDeferredRpc();
// Takes over the RPC state 'ctx' of an early sender for deferred processing and
// kicks off a deserialization task to process it asynchronously. The ownership of
@@ -110,12 +114,26 @@ class KrpcDataStreamRecvr::SenderQueue {
// soft limit of the receiver to be exceeded. Expected to be called with lock_ held.
bool CanEnqueue(int64_t batch_size) const;
+ // Helper function for inserting 'payload' into 'deferred_rpcs_'. Also does some
+ // accounting for various counters.
+ void EnqueueDeferredRpc(unique_ptr<TransmitDataCtx> payload);
+
+ // Helper function for removing the first item from 'deferred_rpcs_'. Also does some
+ // accounting for various counters.
+ void DequeueDeferredRpc();
+
// Unpacks a serialized row batch from 'request' and 'rpc_context' and populates
- // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch size will
- // be stored in 'batch_size'. On failure, the error status is returned.
+ // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch sizes is
+ // stored in 'deserialized_size'. If 'serialized_size' is not NULL, also stores the
+ // serialized row batch size in it. On failure, the error status is returned.
static Status UnpackRequest(const TransmitDataRequestPB* request,
RpcContext* rpc_context, kudu::Slice* tuple_offsets, kudu::Slice* tuple_data,
- int64_t* batch_size);
+ int64_t* deserialized_size, int64_t* serialized_size = nullptr);
+
+ // Helper function to compute the serialized row batch size from 'request'
+ // and 'rpc_context'. Returns 0 on failure to unpack the serialized row batch.
+ static int64_t GetSerializedBatchSize(const TransmitDataRequestPB* request,
+ RpcContext* rpc_context);
// The workhorse function for deserializing a row batch represented by ('header',
// 'tuple_offsets' and 'tuple_data') and inserting it into 'batch_queue'. Expects to be
@@ -183,6 +201,11 @@ class KrpcDataStreamRecvr::SenderQueue {
// full when they last tried to do so. The senders wait here until there is a space for
// their batches, allowing the receiver-side to implement basic flow-control.
std::queue<std::unique_ptr<TransmitDataCtx>> deferred_rpcs_;
+
+ // Monotonic time in nanoseconds of when 'deferred_rpcs_' goes from being empty to
+ // non-empty. Set to 0 when 'deferred_rpcs_' becomes empty again. Used for computing
+ // 'total_has_deferred_rpcs_timer_'.
+ int64_t has_deferred_rpcs_start_time_ns_ = 0;
};
KrpcDataStreamRecvr::SenderQueue::SenderQueue(
@@ -190,7 +213,7 @@ KrpcDataStreamRecvr::SenderQueue::SenderQueue(
: recvr_(parent_recvr), num_remaining_senders_(num_senders) { }
Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
- SCOPED_TIMER(recvr_->queue_get_batch_time_);
+ SCOPED_TIMER(recvr_->queue_get_batch_timer_);
DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
DCHECK(!recvr_->closed_);
int num_to_dequeue = 0;
@@ -204,10 +227,15 @@ Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
// Wait until something shows up or we know we're done
while (batch_queue_.empty() && !is_cancelled_ && num_remaining_senders_ > 0) {
+ // Verify before waiting on 'data_arrival_cv_' that if there are any deferred
+ // batches, either there is outstanding deserialization request queued or there
+ // is pending insertion so this thread is guaranteed to wake up at some point.
+ DCHECK(deferred_rpcs_.empty() ||
+ (num_deserialize_tasks_pending_ + num_pending_enqueue_) > 0);
VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
<< " node=" << recvr_->dest_node_id();
// Don't count time spent waiting on the sender as active time.
- CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
+ CANCEL_SAFE_SCOPED_TIMER(recvr_->data_wait_timer_, &is_cancelled_);
CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
CANCEL_SAFE_SCOPED_TIMER(
received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
@@ -248,7 +276,9 @@ Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
DCHECK(!batch_queue_.empty());
received_first_batch_ = true;
RowBatch* result = batch_queue_.front().second.release();
- recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
+ int64_t batch_size = batch_queue_.front().first;
+ COUNTER_ADD(recvr_->bytes_dequeued_counter_, batch_size);
+ recvr_->num_buffered_bytes_.Add(-batch_size);
batch_queue_.pop_front();
VLOG_ROW << "fetched #rows=" << result->num_rows();
current_batch_.reset(result);
@@ -276,9 +306,29 @@ inline bool KrpcDataStreamRecvr::SenderQueue::CanEnqueue(int64_t batch_size) con
return queue_empty || !recvr_->ExceedsLimit(batch_size);
}
+void KrpcDataStreamRecvr::SenderQueue::EnqueueDeferredRpc(
+ unique_ptr<TransmitDataCtx> payload) {
+ if (deferred_rpcs_.empty()) has_deferred_rpcs_start_time_ns_ = MonotonicNanos();
+ deferred_rpcs_.push(move(payload));
+ recvr_->num_deferred_rpcs_.Add(1);
+ COUNTER_ADD(recvr_->total_deferred_rpcs_counter_, 1);
+}
+
+void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+ deferred_rpcs_.pop();
+ if (deferred_rpcs_.empty()) {
+ DCHECK_NE(has_deferred_rpcs_start_time_ns_, 0);
+ int64_t duration = MonotonicNanos() - has_deferred_rpcs_start_time_ns_;
+ COUNTER_ADD(recvr_->total_has_deferred_rpcs_timer_, duration);
+ has_deferred_rpcs_start_time_ns_ = 0;
+ }
+ recvr_->num_deferred_rpcs_.Add(-1);
+}
+
Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
const TransmitDataRequestPB* request, RpcContext* rpc_context,
- kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* batch_size) {
+ kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* deserialized_size,
+ int64_t* serialized_size) {
// Unpack the tuple offsets.
KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
request->tuple_offsets_sidecar_idx(), tuple_offsets),
@@ -288,8 +338,12 @@ Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
request->tuple_data_sidecar_idx(), tuple_data),
"Failed to get the tuple data sidecar");
// Compute the size of the deserialized row batch.
- *batch_size =
+ *deserialized_size =
RowBatch::GetDeserializedSize(request->row_batch_header(), *tuple_offsets);
+ // Compute the size of the serialized row batch.
+ if (serialized_size != nullptr) {
+ *serialized_size = tuple_offsets->size() + tuple_data->size();
+ }
return Status::OK();
}
@@ -300,8 +354,6 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
DCHECK(lock->owns_lock());
DCHECK(!is_cancelled_);
- COUNTER_ADD(recvr_->num_received_batches_, 1);
- COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
// Reserve queue space before dropping the lock below.
recvr_->num_buffered_bytes_.Add(batch_size);
// Bump 'num_pending_enqueue_' to avoid race with Close() when lock is dropped below.
@@ -331,7 +383,7 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
return status;
}
VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
- COUNTER_ADD(recvr_->num_enqueued_batches_, 1);
+ COUNTER_ADD(recvr_->total_enqueued_batches_counter_, 1);
batch_queue_.emplace_back(batch_size, move(batch));
data_arrival_cv_.notify_one();
return Status::OK();
@@ -344,14 +396,15 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
kudu::Slice tuple_offsets;
kudu::Slice tuple_data;
int64_t batch_size;
- COUNTER_ADD(recvr_->num_arrived_batches_, 1);
Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
&batch_size);
if (UNLIKELY(!status.ok())) {
- status.ToProto(response->mutable_status());
- rpc_context->RespondSuccess();
+ DataStreamService::RespondRpc(status, response, rpc_context);
return;
}
+ COUNTER_ADD(recvr_->total_received_batches_counter_, 1);
+ // To be consistent with the senders, only count the sidecars size.
+ COUNTER_ADD(recvr_->bytes_received_counter_, tuple_data.size() + tuple_offsets.size());
{
unique_lock<SpinLock> l(lock_);
@@ -361,8 +414,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
// responded to if we reach here.
DCHECK_GT(num_remaining_senders_, 0);
if (UNLIKELY(is_cancelled_)) {
- Status::OK().ToProto(response->mutable_status());
- rpc_context->RespondSuccess();
+ DataStreamService::RespondRpc(Status::OK(), response, rpc_context);
return;
}
@@ -375,8 +427,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
recvr_->deferred_rpc_tracker()->Consume(rpc_context->GetTransferSize());
auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
- deferred_rpcs_.push(move(payload));
- COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+ EnqueueDeferredRpc(move(payload));
return;
}
@@ -385,11 +436,10 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
}
// Respond to the sender to ack the insertion of the row batches.
- status.ToProto(response->mutable_status());
- rpc_context->RespondSuccess();
+ DataStreamService::RespondRpc(status, response, rpc_context);
}
-void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+void KrpcDataStreamRecvr::SenderQueue::ProcessDeferredRpc() {
// Owns the first entry of 'deferred_rpcs_' if it ends up being popped.
std::unique_ptr<TransmitDataCtx> ctx;
Status status;
@@ -413,7 +463,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
if (UNLIKELY(!status.ok())) {
DataStreamService::RespondAndReleaseRpc(status, ctx->response, ctx->rpc_context,
recvr_->deferred_rpc_tracker());
- deferred_rpcs_.pop();
+ DequeueDeferredRpc();
return;
}
@@ -426,7 +476,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
}
// Dequeues the deferred batch and adds it to 'batch_queue_'.
- deferred_rpcs_.pop();
+ DequeueDeferredRpc();
const RowBatchHeaderPB& header = ctx->request->row_batch_header();
status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
DCHECK(!status.ok() || !batch_queue_.empty());
@@ -437,8 +487,20 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
// Responds to the sender to ack the insertion of the row batches.
// No need to hold lock when enqueuing the response.
- status.ToProto(ctx->response->mutable_status());
- ctx->rpc_context->RespondSuccess();
+ DataStreamService::RespondRpc(status, ctx->response, ctx->rpc_context);
+}
+
+int64_t KrpcDataStreamRecvr::SenderQueue::GetSerializedBatchSize(
+ const TransmitDataRequestPB* request, RpcContext* rpc_context) {
+ kudu::Slice tuple_offsets;
+ kudu::Slice tuple_data;
+ int64_t unused;
+ int64_t serialized_size = 0;
+ if (UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data, &unused,
+ &serialized_size).ok()) {
+ return serialized_size;
+ }
+ return 0;
}
void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
@@ -448,21 +510,21 @@ void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
// 'recvr_->mgr_' shouldn't be NULL.
DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
DCHECK(!recvr_->closed_ && recvr_->mgr_ != nullptr);
+ COUNTER_ADD(recvr_->total_received_batches_counter_, 1);
+ COUNTER_ADD(recvr_->bytes_received_counter_,
+ GetSerializedBatchSize(ctx->request, ctx->rpc_context));
int sender_id = ctx->request->sender_id();
- COUNTER_ADD(recvr_->num_arrived_batches_, 1);
{
lock_guard<SpinLock> l(lock_);
if (UNLIKELY(is_cancelled_)) {
- Status::OK().ToProto(ctx->response->mutable_status());
- ctx->rpc_context->RespondSuccess();
+ DataStreamService::RespondRpc(Status::OK(), ctx->response, ctx->rpc_context);
return;
}
// Only enqueue a deferred RPC if the sender queue is not yet cancelled.
recvr_->deferred_rpc_tracker()->Consume(ctx->rpc_context->GetTransferSize());
- deferred_rpcs_.push(move(ctx));
+ EnqueueDeferredRpc(move(ctx));
++num_deserialize_tasks_pending_;
}
- COUNTER_ADD(recvr_->num_deferred_batches_, 1);
recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
recvr_->dest_node_id(), sender_id, 1);
}
@@ -489,7 +551,7 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
const unique_ptr<TransmitDataCtx>& ctx = deferred_rpcs_.front();
DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response,
ctx->rpc_context, recvr_->deferred_rpc_tracker());
- deferred_rpcs_.pop();
+ DequeueDeferredRpc();
}
}
VLOG_QUERY << "cancelled stream: fragment_instance_id="
@@ -565,8 +627,8 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
parent_tracker_(parent_tracker),
buffer_pool_client_(client),
profile_(profile),
- recvr_side_profile_(RuntimeProfile::Create(&pool_, "RecvrSide")),
- sender_side_profile_(RuntimeProfile::Create(&pool_, "SenderSide")) {
+ dequeue_profile_(RuntimeProfile::Create(&pool_, "Dequeue")),
+ enqueue_profile_(RuntimeProfile::Create(&pool_, "Enqueue")) {
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
sender_queues_.reserve(num_queues);
@@ -576,36 +638,47 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
sender_queues_.push_back(queue);
}
- // Add the receiver and sender sides' profiles as children of the owning exchange
- // node's profile.
- profile_->AddChild(recvr_side_profile_);
- profile_->AddChild(sender_side_profile_);
+ // Add the profiles of the dequeuing side (i.e. GetBatch()) and the enqueuing side
+ // (i.e. AddBatchWork()) as children of the owning exchange node's profile.
+ profile_->AddChild(dequeue_profile_);
+ profile_->AddChild(enqueue_profile_);
+
+ // Initialize various counters for measuring dequeuing from queues.
+ bytes_dequeued_counter_ =
+ ADD_COUNTER(dequeue_profile_, "TotalBytesDequeued", TUnit::BYTES);
+ bytes_dequeued_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+ dequeue_profile_, "BytesDequeued", bytes_dequeued_counter_);
+ queue_get_batch_timer_ = ADD_TIMER(dequeue_profile_, "TotalGetBatchTime");
+ data_wait_timer_ =
+ ADD_CHILD_TIMER(dequeue_profile_, "DataWaitTime", "TotalGetBatchTime");
+ inactive_timer_ = profile_->inactive_timer();
+ first_batch_wait_total_timer_ =
+ ADD_TIMER(dequeue_profile_, "FirstBatchWaitTime");
- // Initialize the counters
+ // Initialize various counters for measuring enqueuing into queues.
bytes_received_counter_ =
- ADD_COUNTER(recvr_side_profile_, "TotalBytesReceived", TUnit::BYTES);
+ ADD_COUNTER(enqueue_profile_, "TotalBytesReceived", TUnit::BYTES);
bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
- recvr_side_profile_, "BytesReceived", bytes_received_counter_);
- queue_get_batch_time_ = ADD_TIMER(recvr_side_profile_, "TotalGetBatchTime");
- data_arrival_timer_ =
- ADD_CHILD_TIMER(recvr_side_profile_, "DataArrivalTimer", "TotalGetBatchTime");
- inactive_timer_ = profile_->inactive_timer();
- first_batch_wait_total_timer_ =
- ADD_TIMER(recvr_side_profile_, "FirstBatchArrivalWaitTime");
+ enqueue_profile_, "BytesReceived", bytes_received_counter_);
deserialize_row_batch_timer_ =
- ADD_TIMER(sender_side_profile_, "DeserializeRowBatchTime");
- num_early_senders_ =
- ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
- num_arrived_batches_ =
- ADD_COUNTER(sender_side_profile_, "NumBatchesArrived", TUnit::UNIT);
- num_received_batches_ =
- ADD_COUNTER(sender_side_profile_, "NumBatchesReceived", TUnit::UNIT);
- num_enqueued_batches_ =
- ADD_COUNTER(sender_side_profile_, "NumBatchesEnqueued", TUnit::UNIT);
- num_deferred_batches_ =
- ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
- num_eos_received_ =
- ADD_COUNTER(sender_side_profile_, "NumEosReceived", TUnit::UNIT);
+ ADD_TIMER(enqueue_profile_, "DeserializeRowBatchTime");
+ total_eos_received_counter_ =
+ ADD_COUNTER(enqueue_profile_, "TotalEosReceived", TUnit::UNIT);
+ total_early_senders_counter_ =
+ ADD_COUNTER(enqueue_profile_, "TotalEarlySenders", TUnit::UNIT);
+ total_received_batches_counter_ =
+ ADD_COUNTER(enqueue_profile_, "TotalBatchesReceived", TUnit::UNIT);
+ total_enqueued_batches_counter_ =
+ ADD_COUNTER(enqueue_profile_, "TotalBatchesEnqueued", TUnit::UNIT);
+ total_deferred_rpcs_counter_ =
+ ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
+ deferred_rpcs_time_series_counter_ =
+ enqueue_profile_->AddTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
+ bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
+ total_has_deferred_rpcs_timer_ =
+ ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
+ dispatch_timer_ =
+ ADD_SUMMARY_STATS_TIMER(enqueue_profile_, "DispatchTime");
}
Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -616,28 +689,30 @@ Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request,
TransmitDataResponsePB* response, RpcContext* rpc_context) {
+ MonoDelta duration(MonoTime::Now().GetDeltaSince(rpc_context->GetTimeReceived()));
+ dispatch_timer_->UpdateCounter(duration.ToNanoseconds());
int use_sender_id = is_merging_ ? request->sender_id() : 0;
// Add all batches to the same queue if is_merging_ is false.
sender_queues_[use_sender_id]->AddBatch(request, response, rpc_context);
}
-void KrpcDataStreamRecvr::DequeueDeferredRpc(int sender_id) {
+void KrpcDataStreamRecvr::ProcessDeferredRpc(int sender_id) {
int use_sender_id = is_merging_ ? sender_id : 0;
// Add all batches to the same queue if is_merging_ is false.
- sender_queues_[use_sender_id]->DequeueDeferredRpc();
+ sender_queues_[use_sender_id]->ProcessDeferredRpc();
}
void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
int use_sender_id = is_merging_ ? ctx->request->sender_id() : 0;
// Add all batches to the same queue if is_merging_ is false.
sender_queues_[use_sender_id]->TakeOverEarlySender(move(ctx));
- COUNTER_ADD(num_early_senders_, 1);
+ COUNTER_ADD(total_early_senders_counter_, 1);
}
void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
int use_sender_id = is_merging_ ? sender_id : 0;
sender_queues_[use_sender_id]->DecrementSenders();
- COUNTER_ADD(num_eos_received_, 1);
+ COUNTER_ADD(total_eos_received_counter_, 1);
}
void KrpcDataStreamRecvr::CancelStream() {
@@ -660,7 +735,8 @@ void KrpcDataStreamRecvr::Close() {
// Given all queues have been cancelled and closed already at this point, it's safe to
// call Close() on 'deferred_rpc_tracker_' without holding any lock here.
deferred_rpc_tracker_->Close();
- recvr_side_profile_->StopPeriodicCounters();
+ dequeue_profile_->StopPeriodicCounters();
+ enqueue_profile_->StopPeriodicCounters();
// Remove reference to the unowned resources which may be freed after Close().
mgr_ = nullptr;
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 845dbf5..1435e44 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -145,7 +145,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
/// identified by 'sender_id'. If is_merging_ is false, it always defaults to
/// queue 0; If is_merging_ is true, the sender queue is identified by 'sender_id_'.
/// Called from KrpcDataStreamMgr's deserialization threads only.
- void DequeueDeferredRpc(int sender_id);
+ void ProcessDeferredRpc(int sender_id);
/// Takes over the RPC state 'ctx' of an early sender for deferred processing and
/// kicks off a deserialization task to process it asynchronously. This makes sure
@@ -167,6 +167,9 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
}
+ /// Return the current number of deferred RPCs.
+ int64_t num_deferred_rpcs() const { return num_deferred_rpcs_.Load(); }
+
/// KrpcDataStreamMgr instance used to create this recvr. Not owned.
KrpcDataStreamMgr* mgr_;
@@ -190,9 +193,12 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
/// from the fragment execution thread.
bool closed_;
- /// total number of bytes held across all sender queues.
+ /// Current number of bytes held across all sender queues.
AtomicInt32 num_buffered_bytes_;
+ /// Current number of outstanding deferred RPCs across all sender queues.
+ AtomicInt64 num_deferred_rpcs_;
+
/// Memtracker for payloads of deferred Rpcs in the sender queue(s). This must be
/// accessed with a sender queue's lock held to avoid race with Close() of the queue.
boost::scoped_ptr<MemTracker> deferred_rpc_tracker_;
@@ -217,60 +223,79 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
ObjectPool pool_;
/// Runtime profile of the owning exchange node. It's the parent of
- /// 'recvr_side_profile_' and 'sender_side_profile_'. Not owned.
+ /// 'dequeue_profile_' and 'enqueue_profile_'. Not owned.
RuntimeProfile* profile_;
/// Maintain two child profiles - receiver side measurements (from the GetBatch() path),
/// and sender side measurements (from AddBatch()). These two profiles own all counters
/// below unless otherwise noted. These profiles are owned by the receiver and placed
- /// in 'pool_'. 'recvr_side_profile_' and 'sender_side_profile_' must outlive 'profile_'
+ /// in 'pool_'. 'dequeue_profile_' and 'enqueue_profile_' must outlive 'profile_'
/// to prevent accessing freed memory during top-down traversal of 'profile_'. The
/// receiver is co-owned by the exchange node and the data stream manager so these two
/// profiles should outlive the exchange node which owns 'profile_'.
- RuntimeProfile* recvr_side_profile_;
- RuntimeProfile* sender_side_profile_;
+ RuntimeProfile* dequeue_profile_;
+ RuntimeProfile* enqueue_profile_;
- /// Number of bytes received but not necessarily enqueued.
- RuntimeProfile::Counter* bytes_received_counter_;
+ /// Pointer to profile's inactive timer. Not owned.
+ /// Not directly shown in the profile and thus data_wait_time_ below. Used for
+ /// subtracting the wait time from the total time spent in exchange node.
+ RuntimeProfile::Counter* inactive_timer_;
- /// Time series of number of bytes received, samples bytes_received_counter_.
- RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
+ /// ------------------------------------------------------------------------------------
+ /// Following counters belong to 'dequeue_profile_'.
- /// Total wall-clock time spent deserializing row batches.
- RuntimeProfile::Counter* deserialize_row_batch_timer_;
+ /// Number of bytes of deserialized row batches dequeued.
+ RuntimeProfile::Counter* bytes_dequeued_counter_;
+
+ /// Time series of bytes of deserialized row batches, samples 'bytes_dequeued_counter_'.
+ RuntimeProfile::TimeSeriesCounter* bytes_dequeued_time_series_counter_;
+
+ /// Total wall-clock time spent in SenderQueue::GetBatch().
+ RuntimeProfile::Counter* queue_get_batch_timer_;
- /// Number of senders which arrive before the receiver is ready.
- RuntimeProfile::Counter* num_early_senders_;
+ /// Total wall-clock time spent waiting for data to be available in queues.
+ RuntimeProfile::Counter* data_wait_timer_;
- /// Time spent waiting until the first batch arrives across all queues.
- /// TODO: Turn this into a wall-clock timer.
+ /// Wall-clock time spent waiting for the first batch arrival across all queues.
RuntimeProfile::Counter* first_batch_wait_total_timer_;
- /// Total number of batches which arrived at this receiver but not necessarily received
- /// or enqueued. An arrived row batch will eventually be received if there is no error
- /// unpacking the RPC payload and the receiving stream is not cancelled.
- RuntimeProfile::Counter* num_arrived_batches_;
+ /// ------------------------------------------------------------------------------------
+ /// Following counters belong to 'enqueue_profile_'.
- /// Total number of batches received but not necessarily enqueued.
- RuntimeProfile::Counter* num_received_batches_;
+ /// Total number of bytes of serialized row batches received.
+ RuntimeProfile::Counter* bytes_received_counter_;
- /// Total number of batches received and enqueued into the row batch queue.
- RuntimeProfile::Counter* num_enqueued_batches_;
+ /// Time series of number of bytes received, samples 'bytes_received_counter_'.
+ RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
- /// Total number of batches deferred because of early senders or full row batch queue.
- RuntimeProfile::Counter* num_deferred_batches_;
+ /// Total wall-clock time spent deserializing row batches.
+ RuntimeProfile::Counter* deserialize_row_batch_timer_;
/// Total number of EOS received.
- RuntimeProfile::Counter* num_eos_received_;
+ RuntimeProfile::Counter* total_eos_received_counter_;
- /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
- RuntimeProfile::Counter* data_arrival_timer_;
+ /// Total number of senders which arrive before the receiver is ready.
+ RuntimeProfile::Counter* total_early_senders_counter_;
- /// Pointer to profile's inactive timer. Not owned.
- RuntimeProfile::Counter* inactive_timer_;
+ /// Total number of serialized row batches received.
+ RuntimeProfile::Counter* total_received_batches_counter_;
+
+ /// Total number of deserialized row batches enqueued into the row batch queues.
+ RuntimeProfile::Counter* total_enqueued_batches_counter_;
+
+ /// Total number of RPCs whose responses are deferred because of early senders or
+ /// full row batch queue.
+ RuntimeProfile::Counter* total_deferred_rpcs_counter_;
+
+ /// Time series of number of deferred row batches, samples 'num_deferred_rpcs_'.
+ RuntimeProfile::TimeSeriesCounter* deferred_rpcs_time_series_counter_;
+
+ /// Total wall-clock time in which the 'deferred_rpcs_' queues are not empty.
+ RuntimeProfile::Counter* total_has_deferred_rpcs_timer_;
- /// Total time spent in SenderQueue::GetBatch().
- RuntimeProfile::Counter* queue_get_batch_time_;
+ /// Summary stats of time which RPCs spent in KRPC service queue before
+ /// being dispatched to the RPC handlers.
+ RuntimeProfile::SummaryStatsCounter* dispatch_timer_;
};
} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index bc6a07d..0f11dec 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -142,8 +142,6 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
// This function blocks until the EOS RPC is complete.
Status FlushAndSendEos(RuntimeState* state);
- int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
-
// The type for a RPC worker function.
typedef boost::function<Status()> DoRpcFn;
@@ -160,9 +158,6 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
const TUniqueId fragment_instance_id_;
const PlanNodeId dest_node_id_;
- // Number of bytes of all serialized row batches sent successfully.
- int64_t num_data_bytes_sent_ = 0;
-
// The row batch for accumulating rows copied from AddRow().
// Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
scoped_ptr<RowBatch> batch_;
@@ -211,6 +206,9 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
// The pointer to the current serialized row batch being sent.
const OutboundRowBatch* rpc_in_flight_batch_ = nullptr;
+ // The monotonic time in nanoseconds of when current RPC started.
+ int64_t rpc_start_time_ns_ = 0;
+
// True if there is an in-flight RPC.
bool rpc_in_flight_ = false;
@@ -311,12 +309,14 @@ void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
rpc_in_flight_ = false;
rpc_in_flight_batch_ = nullptr;
rpc_done_cv_.notify_one();
+ rpc_start_time_ns_ = 0;
}
Status KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* lock) {
DCHECK(lock != nullptr);
DCHECK(lock->owns_lock());
+ SCOPED_TIMER(parent_->profile()->inactive_timer());
SCOPED_TIMER(parent_->state_->total_network_send_timer());
// Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled.
@@ -379,17 +379,20 @@ void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
}
void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
+ DCHECK_NE(rpc_start_time_ns_, 0);
+ int64_t total_time = MonotonicNanos() - rpc_start_time_ns_;
std::unique_lock<SpinLock> l(lock_);
DCHECK(rpc_in_flight_);
const kudu::Status controller_status = rpc_controller_.status();
if (LIKELY(controller_status.ok())) {
+ DCHECK(rpc_in_flight_batch_ != nullptr);
+ COUNTER_ADD(parent_->bytes_sent_counter_,
+ RowBatch::GetSerializedSize(*rpc_in_flight_batch_));
+ int64_t network_time = total_time - resp_.receiver_latency_ns();
+ COUNTER_ADD(&parent_->total_network_timer_, network_time);
Status rpc_status = Status::OK();
int32_t status_code = resp_.status().status_code();
- if (LIKELY(status_code == TErrorCode::OK)) {
- DCHECK(rpc_in_flight_batch_ != nullptr);
- num_data_bytes_sent_ += RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
- VLOG_ROW << "incremented #data_bytes_sent=" << num_data_bytes_sent_;
- } else if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
+ if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
remote_recvr_closed_ = true;
} else {
rpc_status = Status(resp_.status());
@@ -429,6 +432,7 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
req.set_tuple_offsets_sidecar_idx(sidecar_idx);
// Add 'tuple_data_' as sidecar.
+ rpc_start_time_ns_ = MonotonicNanos();
KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(
RpcSidecar::FromSlice(rpc_in_flight_batch_->TupleDataAsSlice()), &sidecar_idx),
"Unable to add tuple data to sidecar");
@@ -572,6 +576,7 @@ KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* r
sender_id_(sender_id),
partition_type_(sink.output_partition.type),
per_channel_buffer_size_(per_channel_buffer_size),
+ total_network_timer_(TUnit::TIME_NS, 0),
dest_node_id_(sink.dest_node_id),
next_unknown_partition_(0) {
DCHECK_GT(destinations.size(), 0);
@@ -605,6 +610,7 @@ KrpcDataStreamSender::~KrpcDataStreamSender() {
Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
const TDataSink& tsink, RuntimeState* state) {
+ SCOPED_TIMER(profile_->total_time_counter());
DCHECK(tsink.__isset.stream_sink);
if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
partition_type_ == TPartitionType::KUDU) {
@@ -625,15 +631,17 @@ Status KrpcDataStreamSender::Prepare(
serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
- bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+ bytes_sent_counter_ = ADD_COUNTER(profile(), "TotalBytesSent", TUnit::BYTES);
+ bytes_sent_time_series_counter_ =
+ ADD_TIME_SERIES_COUNTER(profile(), "BytesSent", bytes_sent_counter_);
+ network_throughput_counter_ =
+ profile()->AddDerivedCounter("NetworkThroughput", TUnit::BYTES_PER_SECOND,
+ bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
+ &total_network_timer_));
eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
uncompressed_bytes_counter_ =
ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
- total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);
- overall_throughput_ =
- profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
- bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
- profile()->total_time_counter()));
+ total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsSent", TUnit::UNIT);
for (int i = 0; i < channels_.size(); ++i) {
RETURN_IF_ERROR(channels_[i]->Init(state));
}
@@ -641,17 +649,19 @@ Status KrpcDataStreamSender::Prepare(
}
Status KrpcDataStreamSender::Open(RuntimeState* state) {
+ SCOPED_TIMER(profile_->total_time_counter());
return ScalarExprEvaluator::Open(partition_expr_evals_, state);
}
Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
+ SCOPED_TIMER(profile()->total_time_counter());
DCHECK(!closed_);
DCHECK(!flushed_);
if (batch->num_rows() == 0) return Status::OK();
if (partition_type_ == TPartitionType::UNPARTITIONED) {
OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
- RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch));
+ RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch, channels_.size()));
// TransmitData() will block if there are still in-flight rpcs (and those will
// reference the previously written serialized batch).
for (int i = 0; i < channels_.size(); ++i) {
@@ -736,6 +746,7 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
}
Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
+ SCOPED_TIMER(profile()->total_time_counter());
DCHECK(!flushed_);
DCHECK(!closed_);
flushed_ = true;
@@ -749,12 +760,14 @@ Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
}
void KrpcDataStreamSender::Close(RuntimeState* state) {
+ SCOPED_TIMER(profile()->total_time_counter());
if (closed_) return;
for (int i = 0; i < channels_.size(); ++i) {
channels_[i]->Teardown(state);
}
ScalarExprEvaluator::Close(partition_expr_evals_, state);
ScalarExpr::Close(partition_exprs_);
+ profile()->StopPeriodicCounters();
DataSink::Close(state);
}
@@ -762,25 +775,16 @@ Status KrpcDataStreamSender::SerializeBatch(
RowBatch* src, OutboundRowBatch* dest, int num_receivers) {
VLOG_ROW << "serializing " << src->num_rows() << " rows";
{
- SCOPED_TIMER(profile_->total_time_counter());
SCOPED_TIMER(serialize_batch_timer_);
RETURN_IF_ERROR(src->Serialize(dest));
- int64_t bytes = RowBatch::GetSerializedSize(*dest);
int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
- COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
}
return Status::OK();
}
int64_t KrpcDataStreamSender::GetNumDataBytesSent() const {
- // TODO: do we need synchronization here or are reads & writes to 8-byte ints
- // atomic?
- int64_t result = 0;
- for (int i = 0; i < channels_.size(); ++i) {
- result += channels_[i]->num_data_bytes_sent();
- }
- return result;
+ return bytes_sent_counter_->value();
}
} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index bc18574..e6c6ccf 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -157,20 +157,28 @@ class KrpcDataStreamSender : public DataSink {
/// Total number of times RPC fails or the remote responds with a non-retryable error.
RuntimeProfile::Counter* rpc_failure_counter_ = nullptr;
- /// Total number of bytes sent.
+ /// Total number of bytes sent. Updated on RPC completion.
RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
+ /// Time series of number of bytes sent, samples bytes_sent_counter_.
+ RuntimeProfile::TimeSeriesCounter* bytes_sent_time_series_counter_ = nullptr;
+
/// Total number of EOS sent.
RuntimeProfile::Counter* eos_sent_counter_ = nullptr;
- /// Total number of bytes of the row batches before compression.
+ /// Total number of bytes of row batches before compression.
RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;
/// Total number of rows sent.
RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr;
- /// Throughput per total time spent in sender
- RuntimeProfile::Counter* overall_throughput_ = nullptr;
+ /// Approximate network throughput for sending row batches.
+ RuntimeProfile::Counter* network_throughput_counter_ = nullptr;
+
+ /// Aggregated time spent in network (including queuing time in KRPC transfer queue)
+ /// for transmitting the RPC requests and receiving the responses. Used for computing
+ /// 'network_throughput_'. Not too meaningful by itself so not shown in profile.
+ RuntimeProfile::Counter total_network_timer_;
/// Identifier of the destination plan node.
PlanNodeId dest_node_id_;
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index b292789..7edd718 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -356,7 +356,11 @@ class RuntimeState {
/// Total time waiting in storage (across all threads)
RuntimeProfile::Counter* total_storage_wait_timer_;
- /// Total time spent sending over the network (across all threads)
+ /// Total time spent waiting for RPCs to complete. This time is a combination of:
+ /// - network time of sending the RPC payload to the destination
+ /// - processing and queuing time in the destination
+ /// - network time of sending the RPC response to the originating node
+ /// TODO: rename this counter and account for the 3 components above. IMPALA-6705.
RuntimeProfile::Counter* total_network_send_timer_;
/// Total time spent receiving over the network (across all threads)
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index d94837b..1d42a99 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -23,6 +23,7 @@
#include "common/status.h"
#include "exec/kudu-util.h"
#include "kudu/rpc/rpc_context.h"
+#include "kudu/util/monotime.h"
#include "rpc/rpc-mgr.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/exec-env.h"
@@ -36,6 +37,8 @@
#include "common/names.h"
using kudu::rpc::RpcContext;
+using kudu::MonoDelta;
+using kudu::MonoTime;
static const string queue_limit_msg = "(Advanced) Limit on RPC payloads consumption for "
"DataStreamService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
@@ -82,13 +85,27 @@ void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
}
template<typename ResponsePBType>
+void DataStreamService::RespondRpc(const Status& status,
+ ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
+ MonoDelta duration(MonoTime::Now().GetDeltaSince(ctx->GetTimeReceived()));
+ status.ToProto(response->mutable_status());
+ response->set_receiver_latency_ns(duration.ToNanoseconds());
+ ctx->RespondSuccess();
+}
+
+template<typename ResponsePBType>
void DataStreamService::RespondAndReleaseRpc(const Status& status,
ResponsePBType* response, kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker) {
mem_tracker->Release(ctx->GetTransferSize());
- status.ToProto(response->mutable_status());
- ctx->RespondSuccess();
+ RespondRpc(status, response, ctx);
}
+template void DataStreamService::RespondRpc(const Status& status,
+ TransmitDataResponsePB* response, kudu::rpc::RpcContext* ctx);
+
+template void DataStreamService::RespondRpc(const Status& status,
+ EndDataStreamResponsePB* response, kudu::rpc::RpcContext* ctx);
+
template void DataStreamService::RespondAndReleaseRpc(const Status& status,
TransmitDataResponsePB* response, kudu::rpc::RpcContext* ctx,
MemTracker* mem_tracker);
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 812fb2c..e233165 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -62,6 +62,11 @@ class DataStreamService : public DataStreamServiceIf {
static void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker);
+ /// Respond to a RPC passed in 'response'/'ctx' with 'status'. Takes ownership of 'ctx'.
+ template<typename ResponsePBType>
+ static void RespondRpc(const Status& status, ResponsePBType* response,
+ kudu::rpc::RpcContext* ctx);
+
MemTracker* mem_tracker() { return mem_tracker_.get(); }
private:
http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index c2045d2..854eb87 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -49,6 +49,9 @@ message TransmitDataRequestPB {
message TransmitDataResponsePB {
// Status::OK() on success; Error status on failure.
optional StatusPB status = 1;
+
+ // Latency for response in the receiving daemon in nanoseconds.
+ optional int64 receiver_latency_ns = 2;
}
// All fields are required in V1.
@@ -66,6 +69,9 @@ message EndDataStreamRequestPB {
// All fields are required in V1.
message EndDataStreamResponsePB {
optional StatusPB status = 1;
+
+ // Latency for response in the receiving daemon in nanoseconds.
+ optional int64 receiver_latency_ns = 2;
}
// Handles data transmission between fragment instances.