You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Michael Ho (Code Review)" <ge...@cloudera.org> on 2017/10/06 17:58:27 UTC
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )
Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................
Patch Set 2:
(2 comments)
http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:
http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc@141
PS2, Line 141: while (true) {
: // wait until something shows up or we know we're done
: while (!is_cancelled_ && batch_queue_.empty() && blocked_senders_.empty()
: && num_remaining_senders_ > 0) {
: VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
: << " node=" << recvr_->dest_node_id();
: // Don't count time spent waiting on the sender as active time.
: CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
: CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
: CANCEL_SAFE_SCOPED_TIMER(
: received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
: &is_cancelled_);
: data_arrival_cv_.wait(l);
: }
:
: if (is_cancelled_) return Status::CANCELLED;
:
: if (blocked_senders_.empty() && batch_queue_.empty()) {
: DCHECK_EQ(num_remaining_senders_, 0);
: return Status::OK();
: }
:
: received_first_batch_ = true;
:
: // Either we'll consume a row batch from batch_queue_, or it's empty. In either case,
: // take a blocked sender and retry delivering their batch. There is a window between
: // which a deferred batch is dequeued from blocked_senders_ queue and when it's
: // inserted into batch_queue_. However, a receiver won't respond to the sender until
: // the deferred row batch has been inserted. The sender will wait for all in-flight
: // RPCs to complete before sending EOS RPC so num_remaining_senders_ should be > 0.
: if (!blocked_senders_.empty()) {
: recvr_->mgr_->EnqueueRowBatch(
: {recvr_->fragment_instance_id(), move(blocked_senders_.front())});
: blocked_senders_.pop();
: }
:
: if (!batch_queue_.empty()) {
: RowBatch* result = batch_queue_.front().second;
: recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
: VLOG_ROW << "fetched #rows=" << result->num_rows();
: current_batch_.reset(result);
: *next_batch = current_batch_.get();
: batch_queue_.pop_front();
: return Status::OK();
: }
This loop may lead to live lock in the rare case in which blocked_senders_ and batch_queue_ are both empty in the window in which the deserialization threads are still working on inserting the row batches in blocked_senders_ into batch_queue_ after batch_queue_ has been exhausted. Spinning here forever will not drop the lock, causing the deserialization threads to wait forever to insert into batch_queue_.
http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc
File be/src/runtime/krpc-data-stream-sender.cc:
http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@485
PS2, Line 485: // Sleep for sometime before retrying.
: if (RpcMgr::IsServerTooBusy(rpc_controller_)) {
: SleepForMs(FLAGS_rpc_retry_interval_ms);
: continue;
: }
This is broken if the RPC was rejected for FLAGS_backend_client_connection_num_retries number of times in a row. In which case, we will break out of the loop and return Status::OK(). This can lead to remote receiver hanging forever as it still thinks there are still active senders.
--
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 2
Gerrit-Owner: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-Comment-Date: Fri, 06 Oct 2017 17:58:27 +0000
Gerrit-HasComments: Yes