You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/02/28 08:21:08 UTC

[2/3] impala git commit: IMPALA-6565: Fix some bugs in KprcDataStreamRecvr

IMPALA-6565: Fix some bugs in KprcDataStreamRecvr

This change fixes a couple of issues found with stress test
(concurrent_select.py)

1. The local variable 'status' was mistakenly defined twice in
DequeueDeferredRpc(): one in the outer scope and one in the
inner scope. This causes the error status of AddBatchWork()
to be dropped when the inner scope ends. As a result, the error
status from AddBatchWork() (e.g. MemLimitExceeded) will not be
propagated back to the sender and the receiver will continue
to operate with some missing data, leading to wrong query
results. This change fixes the problem by removing the redefinition
of the status local variable. It also adds some counters in the
profile to make diagnostics of failed RPCs or missing EOS easier.

2. AddBatchWork() may return early without enqueuing a row
batch if it encounters a failure creating a row batch. The bug
is that it may return early without notifying threads waiting
on 'data_arrival_cv_', causing threads waiting for
'num_pending_enqueue_' to 0 to wait indefinitely. This may
cause some fragment instances to stick around after the query
has been cancelled.

3. This one is not mainfesting during stress test and it's benign.
There is a missing check for 'is_cancelled_' in TakeOverEarlySenders()
before enqueuing an early sender into the deferred_rpc_ queue.
The bug is benign because TakeOverEarlySenders() is called from
in fragment execution thread context so it cannot call close the
receiver and call CancelStream() on the receiver until it returns
from CreateRecvr() unless the query is cancelled. In which case,
the sender should also get the cancellation. That said, it should
be fixed. This change checks for the 'is_cancelled_' flag before
enqueuing an early sender into the 'deferred_rpc_' queue.

Testing done: Stress test consistently reproduced the problems before.
With this fix, no wrong results have been seen after 2 iterations
of stress tests, which translates to about 20000 queries being run.

Change-Id: I6b2985a47021ebd4a970861040e7474aca7941b5
Reviewed-on: http://gerrit.cloudera.org:8080/9439
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4c8b02ff
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4c8b02ff
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4c8b02ff

Branch: refs/heads/master
Commit: 4c8b02ffc43e8346440a45f6fee7b2b761a006fd
Parents: 065974c
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 22 16:19:50 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 01:17:29 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/krpc-data-stream-recvr.cc  | 34 ++++++++++++++++++++------
 be/src/runtime/krpc-data-stream-recvr.h   | 19 +++++++++++---
 be/src/runtime/krpc-data-stream-sender.cc |  5 ++++
 be/src/runtime/krpc-data-stream-sender.h  |  6 +++++
 4 files changed, 53 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 2f379f4..4f85996 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -298,7 +298,7 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
 
-  COUNTER_ADD(recvr_->num_accepted_batches_, 1);
+  COUNTER_ADD(recvr_->num_received_batches_, 1);
   COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
   // Reserve queue space before dropping the lock below.
   recvr_->num_buffered_bytes_.Add(batch_size);
@@ -324,9 +324,11 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   --num_pending_enqueue_;
   if (UNLIKELY(!status.ok())) {
     recvr_->num_buffered_bytes_.Add(-batch_size);
+    data_arrival_cv_.notify_one();
     return status;
   }
   VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
+  COUNTER_ADD(recvr_->num_enqueued_batches_, 1);
   batch_queue_.emplace_back(batch_size, move(batch));
   data_arrival_cv_.notify_one();
   return Status::OK();
@@ -347,6 +349,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
   kudu::Slice tuple_offsets;
   kudu::Slice tuple_data;
   int64_t batch_size;
+  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
       &batch_size);
   if (UNLIKELY(!status.ok())) {
@@ -400,15 +403,16 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     DCHECK_GT(num_deserialize_tasks_pending_, 0);
     --num_deserialize_tasks_pending_;
 
-    // Returns if the queue has been cancelled or if it's empty.
-    if (UNLIKELY(is_cancelled_) || deferred_rpcs_.empty()) return;
+    // Returns if the queue is empty. The queue may be drained in Cancel().
+    if (deferred_rpcs_.empty()) return;
+    DCHECK(!is_cancelled_);
 
     // Try enqueuing the first entry into 'batch_queue_'.
     ctx.swap(deferred_rpcs_.front());
     kudu::Slice tuple_offsets;
     kudu::Slice tuple_data;
     int64_t batch_size;
-    Status status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
+    status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
         &tuple_data, &batch_size);
     // Reply with error status if the entry cannot be unpacked.
     if (UNLIKELY(!status.ok())) {
@@ -429,6 +433,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     deferred_rpcs_.pop();
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
     status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+    DCHECK(!status.ok() || !batch_queue_.empty());
   }
 
   // Responds to the sender to ack the insertion of the row batches.
@@ -439,12 +444,17 @@ void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
     unique_ptr<TransmitDataCtx> ctx) {
   int sender_id = ctx->request->sender_id();
   recvr_->mem_tracker()->Consume(ctx->rpc_context->GetTransferSize());
-  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
+    if (UNLIKELY(is_cancelled_)) {
+      RespondAndReleaseRpc(Status::OK(), ctx);
+      return;
+    }
     deferred_rpcs_.push(move(ctx));
     ++num_deserialize_tasks_pending_;
   }
+  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
   recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
       recvr_->dest_node_id(), sender_id, 1);
 }
@@ -489,6 +499,9 @@ void KrpcDataStreamRecvr::SenderQueue::Close() {
   // risk running into a race which can leak row batches. Please see IMPALA-3034.
   DCHECK(is_cancelled_);
 
+  // The deferred RPCs should all have been responded to in Cancel().
+  DCHECK(deferred_rpcs_.empty());
+
   // Wait for any pending insertion to complete first.
   while (num_pending_enqueue_ > 0) data_arrival_cv_.wait(l);
 
@@ -566,10 +579,16 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   inactive_timer_ = profile_->inactive_timer();
   num_early_senders_ =
       ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
+  num_arrived_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesArrived", TUnit::UNIT);
+  num_received_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesReceived", TUnit::UNIT);
+  num_enqueued_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesEnqueued", TUnit::UNIT);
   num_deferred_batches_ =
       ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
-  num_accepted_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesAccepted", TUnit::UNIT);
+  num_eos_received_ =
+      ADD_COUNTER(sender_side_profile_, "NumEosReceived", TUnit::UNIT);
 }
 
 Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -600,6 +619,7 @@ void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
 void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
   int use_sender_id = is_merging_ ? sender_id : 0;
   sender_queues_[use_sender_id]->DecrementSenders();
+  COUNTER_ADD(num_eos_received_, 1);
 }
 
 void KrpcDataStreamRecvr::CancelStream() {

http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index f4c2a5e..758079b 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -195,7 +195,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   RuntimeProfile* recvr_side_profile_;
   RuntimeProfile* sender_side_profile_;
 
-  /// Number of bytes received.
+  /// Number of bytes received but not necessarily enqueued.
   RuntimeProfile::Counter* bytes_received_counter_;
 
   /// Time series of number of bytes received, samples bytes_received_counter_
@@ -211,11 +211,22 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// TODO: Turn this into a wall-clock timer.
   RuntimeProfile::Counter* first_batch_wait_total_timer_;
 
-  /// Total number of batches received and deferred as sender queue is full.
+  /// Total number of batches which arrived at this receiver but not necessarily received
+  /// or enqueued. An arrived row batch will eventually be received if there is no error
+  /// unpacking the RPC payload and the receiving stream is not cancelled.
+  RuntimeProfile::Counter* num_arrived_batches_;
+
+  /// Total number of batches received but not necessarily enqueued.
+  RuntimeProfile::Counter* num_received_batches_;
+
+  /// Total number of batches received and enqueued into the row batch queue.
+  RuntimeProfile::Counter* num_enqueued_batches_;
+
+  /// Total number of batches deferred because of early senders or full row batch queue.
   RuntimeProfile::Counter* num_deferred_batches_;
 
-  /// Total number of batches received and accepted into the sender queue.
-  RuntimeProfile::Counter* num_accepted_batches_;
+  /// Total number of EOS received.
+  RuntimeProfile::Counter* num_eos_received_;
 
   /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
   RuntimeProfile::Counter* data_arrival_timer_;

http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 6c0ad01..d758826 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -306,6 +306,7 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
 }
 
 void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
+  if (UNLIKELY(!status.ok())) COUNTER_ADD(parent_->rpc_failure_counter_, 1);
   rpc_status_ = status;
   rpc_in_flight_ = false;
   rpc_in_flight_batch_ = nullptr;
@@ -535,9 +536,11 @@ Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
     std::unique_lock<SpinLock> l(lock_);
     RETURN_IF_ERROR(WaitForRpc(&l));
     DCHECK(!rpc_in_flight_);
+    DCHECK(rpc_status_.ok());
     if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
     VLOG_RPC << "calling EndDataStream() to terminate channel.";
     rpc_in_flight_ = true;
+    COUNTER_ADD(parent_->eos_sent_counter_, 1);
     RETURN_IF_ERROR(DoEndDataStreamRpc());
     RETURN_IF_ERROR(WaitForRpc(&l));
   }
@@ -619,7 +622,9 @@ Status KrpcDataStreamSender::Prepare(
       &partition_expr_evals_));
   serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
   rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
+  rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
   bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+  eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
   total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index ce0851e..0c3a32e 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -157,9 +157,15 @@ class KrpcDataStreamSender : public DataSink {
   /// Number of TransmitData() RPC retries due to remote service being busy.
   RuntimeProfile::Counter* rpc_retry_counter_ = nullptr;
 
+  /// Total number of times RPC fails or the remote responds with a non-retryable error.
+  RuntimeProfile::Counter* rpc_failure_counter_ = nullptr;
+
   /// Total number of bytes sent.
   RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
 
+  /// Total number of EOS sent.
+  RuntimeProfile::Counter* eos_sent_counter_ = nullptr;
+
   /// Total number of bytes of the row batches before compression.
   RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;