You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/11/09 22:55:37 UTC
[2/3] incubator-impala git commit: IMPALA-4856: Port data stream
service to KRPC
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 0731d45..68f00e3 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -17,46 +17,596 @@
#include "runtime/krpc-data-stream-recvr.h"
-#include "common/logging.h"
+#include <condition_variable>
+#include <queue>
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "runtime/krpc-data-stream-recvr.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/sorted-run-merger.h"
+#include "util/runtime-profile-counters.h"
+#include "util/periodic-counter-updater.h"
+
+#include "gen-cpp/data_stream_service.pb.h"
+
+#include "common/names.h"
DECLARE_bool(use_krpc);
+DECLARE_int32(datastream_service_num_deserialization_threads);
+
+using kudu::rpc::RpcContext;
+using std::condition_variable_any;
namespace impala {
-[[noreturn]] static void AbortUnsupportedFeature() {
- // We should have gotten here only if the FLAGS_use_krpc is set to true.
- CHECK(FLAGS_use_krpc) << "Shouldn't reach here unless startup flag 'use_krpc' "
- "is true.";
- // KRPC isn't supported yet, so abort.
- ABORT_WITH_ERROR("KRPC is not supported yet. Please set the 'use_krpc' flag to "
- "false and restart the cluster.");
+// Implements a FIFO queue of row batches from one or more senders. One queue is
+// maintained per sender if is_merging_ is true for the enclosing receiver, otherwise rows
+// from all senders are placed in the same queue.
+//
+// Batches are added by senders via AddBatch(), and removed by an enclosing
+// KrpcDataStreamRecvr via GetBatch(). There is a soft limit for the total amount of
+// memory consumed by buffered row batches in all sender queues of a receiver. If adding
+// a batch will push the memory consumption beyond the limit, that RPC is added to the
+// 'deferred batches' queue, which will be drained in FIFO order when space opens up.
+// Senders in that state will not be replied to until their row batches are deserialized
+// or the receiver is cancelled. This ensures that only one batch per sender is buffered
+// in the deferred batches queue.
+class KrpcDataStreamRecvr::SenderQueue {
+ public:
+ SenderQueue(KrpcDataStreamRecvr* parent_recvr, int num_senders);
+
+ // Returns the next batch from this sender queue. Sets the returned batch in cur_batch_.
+ // A returned batch that is not filled to capacity does *not* indicate end-of-stream.
+ // The call blocks until another batch arrives or all senders close their channels.
+ // The returned batch is owned by the sender queue. The caller must acquire the
+ // resources from the returned batch before the next call to GetBatch().
+ Status GetBatch(RowBatch** next_batch);
+
+ // Adds a new row batch to this sender queue if this stream has not been cancelled.
+ // If adding this batch causes us to exceed the receiver's buffer limit, the RPC state
+ // is copied into 'deferred_rpcs_' for deferred processing and this function returns
+ // immediately. The deferred RPCs are replied to later when space becomes available.
+ void AddBatch(const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+ RpcContext* context);
+
+ // Tries inserting the front of 'deferred_rpcs_' queue into 'batch_queue_' if possible.
+ // On success, the first entry of 'deferred_rpcs_' is removed and the sender of the RPC
+ // will be responded to. If the serialized row batch fails to be extracted from the
+ // entry, the error status will be sent as reply.
+ void DequeueDeferredRpc();
+
+ // Takes over the RPC state 'ctx' of an early sender for deferred processing and
+ // kicks off a deserialization task to process it asynchronously. The ownership of
+ // 'ctx' is transferred to this sender queue.
+ void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx);
+
+ // Decrements the number of remaining senders for this queue and signal any threads
+ // waiting on the arrival of new batch if the count drops to 0. The number of senders
+ // will be 1 for a merging KrpcDataStreamRecvr.
+ void DecrementSenders();
+
+ // Sets cancellation flag and signals cancellation to receiver and sender. Subsequent
+ // incoming batches will be dropped and senders in 'deferred_rpcs_' are replied to.
+ void Cancel();
+
+ // Must be called once to cleanup any queued resources.
+ void Close();
+
+ // Returns the current batch from this queue being processed by a consumer.
+ RowBatch* current_batch() const { return current_batch_.get(); }
+
+ private:
+ // Returns true if either (1) 'batch_queue' is empty and there is no pending insertion
+ // or (2) inserting a row batch of 'batch_size' into 'batch_queue' will not cause the
+ // soft limit of the receiver to be exceeded. Expected to be called with lock_ held.
+ bool CanEnqueue(int64_t batch_size) const;
+
+ // Unpacks a serialized row batch from 'request' and 'rpc_context' and populates
+ // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch size will
+ // be stored in 'batch_size'. On failure, the error status is returned.
+ static Status UnpackRequest(const TransmitDataRequestPB* request,
+ RpcContext* rpc_context, kudu::Slice* tuple_offsets, kudu::Slice* tuple_data,
+ int64_t* batch_size);
+
+ // The workhorse function for deserializing a row batch represented by ('header',
+ // 'tuple_offsets' and 'tuple_data') and inserting it into 'batch_queue'. Expects to be
+ // called with 'lock_' held and passed into this function via the argument 'lock'. This
+ // function may drop lock when deserializing the row batch and re-acquire it after
+ // the row batch is deserialized. 'batch_size' is the size in bytes of the deserialized
+ // row batch. The caller is expected to have called CanEnqueue() to make sure the row
+ // batch can be inserted without exceeding the soft limit of the receiver. Also notify
+ // a thread waiting on 'data_arrival_cv_'.
+ void AddBatchWork(int64_t batch_size, const RowBatchHeaderPB& header,
+ const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
+ unique_lock<SpinLock>* lock);
+
+ // Receiver of which this queue is a member.
+ KrpcDataStreamRecvr* recvr_;
+
+ // Protects all subsequent fields.
+ SpinLock lock_;
+
+ // If true, the receiver fragment for this stream got cancelled.
+ bool is_cancelled_ = false;
+
+ // Number of deserialization requests sent to deserialization threads to drain
+ // 'deferred_rpcs_' which are yet to be processed. Used to limit the number of
+ // requests queued.
+ int num_deserialize_tasks_pending_ = 0;
+
+ // Number of senders which haven't closed the channel yet
+ // (if it drops to 0, end-of-stream is true)
+ int num_remaining_senders_;
+
+ // Number of pending row batch insertion. AddBatchWork() may drop and reacquire 'lock_',
+ // causing race between multiple threads calling AddBatch() at the same time or race
+ // between threads calling AddBatch() and threads calling Close() concurrently.
+ // AddBatchWork() increments this counter before dropping 'lock_' for deserializing
+ // the row batch. The counter is decremented after 'lock_' is re-acquired and the row
+ // batch is inserted into 'batch_queue'. The races are as follows:
+ //
+ // 1. Multiple threads inserting into an empty 'batch_queue' concurrently may all see
+ // it as empty before the first thread manages to insert into batch_queue. This may
+ // cause the soft limit to be exceeded. A queue is truly empty iff this counter is 0.
+ //
+ // 2. Close() cannot proceed until this counter is 0 to make sure all pending inserts
+ // complete before the 'batch_queue' is cleared.
+ int num_pending_enqueue_ = 0;
+
+ // Signal the arrival of new batch or the eos/cancelled condition.
+ condition_variable_any data_arrival_cv_;
+
+ // Queue of (batch length, batch) pairs. The SenderQueue owns the memory to these
+ // batches until they are handed off to the callers of GetBatch().
+ typedef list<pair<int, std::unique_ptr<RowBatch>>> RowBatchQueue;
+ RowBatchQueue batch_queue_;
+
+ // The batch that was most recently returned via GetBatch(), i.e. the current batch
+ // from this queue being processed by a consumer. It's destroyed when the next batch
+ // is retrieved.
+ scoped_ptr<RowBatch> current_batch_;
+
+ // Set to true when the first batch has been received
+ bool received_first_batch_ = false;
+
+ // Queue of deferred RPCs - those that have a batch to deliver, but the queue was
+ // full when they last tried to do so. The senders wait here until there is a space for
+ // their batches, allowing the receiver-side to implement basic flow-control.
+ std::queue<std::unique_ptr<TransmitDataCtx>> deferred_rpcs_;
+};
+
+KrpcDataStreamRecvr::SenderQueue::SenderQueue(
+ KrpcDataStreamRecvr* parent_recvr, int num_senders)
+ : recvr_(parent_recvr), num_remaining_senders_(num_senders) { }
+
+Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
+ SCOPED_TIMER(recvr_->queue_get_batch_time_);
+ int num_to_dequeue = 0;
+ // The sender id is set below when we decide to dequeue entries from 'deferred_rpcs_'.
+ int sender_id = -1;
+ {
+ unique_lock<SpinLock> l(lock_);
+ // current_batch_ must be replaced with the returned batch.
+ current_batch_.reset();
+ *next_batch = nullptr;
+
+ // Wait until something shows up or we know we're done
+ while (batch_queue_.empty() && !is_cancelled_ && num_remaining_senders_ > 0) {
+ VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
+ << " node=" << recvr_->dest_node_id();
+ // Don't count time spent waiting on the sender as active time.
+ CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
+ CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
+ CANCEL_SAFE_SCOPED_TIMER(
+ received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
+ &is_cancelled_);
+ data_arrival_cv_.wait(l);
+ }
+
+ if (UNLIKELY(is_cancelled_)) {
+ // Cancellation should have drained the entire 'deferred_rpcs_' queue.
+ // Make sure the senders were replied to or they may be stuck waiting for a reply.
+ DCHECK(deferred_rpcs_.empty());
+ return Status::CANCELLED;
+ }
+
+ // All senders have sent their row batches. Nothing to do.
+ if (num_remaining_senders_ == 0 && batch_queue_.empty()) {
+ // Note that it's an invariant that a sender cannot send the EOS RPC until all
+ // outstanding TransmitData() RPCs have been replied to. Therefore, it should be
+ // impossible for num_remaining_senders_ to reach 0 before all RPCs in
+ // 'deferred_rpcs_' have been replied to.
+ DCHECK(deferred_rpcs_.empty());
+ DCHECK_EQ(num_pending_enqueue_, 0);
+ return Status::OK();
+ }
+
+ // Notify the deserialization threads to retry delivering the deferred RPCs.
+ if (!deferred_rpcs_.empty()) {
+ // Try dequeuing multiple entries from 'deferred_rpcs_' to parallelize the CPU
+ // bound deserialization work. No point in dequeuing more than number of
+ // deserialization threads available.
+ DCHECK_GE(deferred_rpcs_.size(), num_deserialize_tasks_pending_);
+ num_to_dequeue = min(FLAGS_datastream_service_num_deserialization_threads,
+ (int)deferred_rpcs_.size() - num_deserialize_tasks_pending_);
+ num_deserialize_tasks_pending_ += num_to_dequeue;
+ sender_id = deferred_rpcs_.front()->request->sender_id();
+ }
+
+ DCHECK(!batch_queue_.empty());
+ received_first_batch_ = true;
+ RowBatch* result = batch_queue_.front().second.release();
+ recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
+ batch_queue_.pop_front();
+ VLOG_ROW << "fetched #rows=" << result->num_rows();
+ current_batch_.reset(result);
+ *next_batch = current_batch_.get();
+ }
+ // Don't hold lock when calling EnqueueDeserializeTask() as it may block.
+ // It's important that the dequeuing of 'deferred_rpcs_' is done after the entry
+ // has been removed from 'batch_queue_' or the deserialization threads may fail to
+ // insert into a non-empty 'batch_queue_' and the receiver will be waiting forever.
+ if (num_to_dequeue > 0) {
+ DCHECK_GE(sender_id, 0);
+ recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
+ recvr_->dest_node_id(), sender_id, num_to_dequeue);
+ }
+ return Status::OK();
}
-[[noreturn]] KrpcDataStreamRecvr::KrpcDataStreamRecvr() {
- AbortUnsupportedFeature();
+inline bool KrpcDataStreamRecvr::SenderQueue::CanEnqueue(int64_t batch_size) const {
+ // The queue is truly empty iff there is no pending insert. It's important that we
+ // enqueue the new batch regardless of buffer limit if the queue is currently empty.
+ // In the case of a merging receiver, batches are received from a specific queue
+ // based on data order, and the pipeline will stall if the merger is waiting for data
+ // from an empty queue that cannot be filled because the limit has been reached.
+ bool queue_empty = batch_queue_.empty() && num_pending_enqueue_ == 0;
+ return queue_empty || !recvr_->ExceedsLimit(batch_size);
}
-KrpcDataStreamRecvr::~KrpcDataStreamRecvr() {
+Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
+ const TransmitDataRequestPB* request, RpcContext* rpc_context,
+ kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* batch_size) {
+ // Unpack the tuple offsets.
+ KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
+ request->tuple_offsets_sidecar_idx(), tuple_offsets),
+ "Failed to get the tuple offsets sidecar");
+ // Unpack the tuple data.
+ KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
+ request->tuple_data_sidecar_idx(), tuple_data),
+ "Failed to get the tuple data sidecar");
+ // Compute the size of the deserialized row batch.
+ *batch_size =
+ RowBatch::GetDeserializedSize(request->row_batch_header(), *tuple_offsets);
+ return Status::OK();
}
-[[noreturn]] Status KrpcDataStreamRecvr::GetBatch(RowBatch** next_batch) {
- AbortUnsupportedFeature();
+void KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
+ const RowBatchHeaderPB& header, const kudu::Slice& tuple_offsets,
+ const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock) {
+ DCHECK(lock != nullptr);
+ DCHECK(lock->owns_lock());
+
+ COUNTER_ADD(recvr_->num_accepted_batches_, 1);
+ COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
+ // Reserve queue space before dropping the lock below.
+ recvr_->num_buffered_bytes_.Add(batch_size);
+ DCHECK_GE(num_pending_enqueue_, 0);
+ ++num_pending_enqueue_;
+
+ // Deserialization may take some time due to compression and memory allocation.
+ // Drop the lock so we can deserialize multiple batches in parallel.
+ lock->unlock();
+ unique_ptr<RowBatch> batch;
+ {
+ SCOPED_TIMER(recvr_->deserialize_row_batch_timer_);
+ // At this point, the row batch will be inserted into batch_queue_. Close() will
+ // handle deleting any unconsumed batches from batch_queue_. Close() cannot proceed
+ // until there are no pending insertion to batch_queue_.
+ batch.reset(new RowBatch(recvr_->row_desc(), header, tuple_offsets, tuple_data,
+ recvr_->mem_tracker()));
+ }
+ lock->lock();
+
+ DCHECK_GT(num_pending_enqueue_, 0);
+ --num_pending_enqueue_;
+ VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
+ batch_queue_.emplace_back(batch_size, move(batch));
+ data_arrival_cv_.notify_one();
}
-[[noreturn]] void KrpcDataStreamRecvr::Close() {
- AbortUnsupportedFeature();
+void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* request,
+ TransmitDataResponsePB* response, RpcContext* rpc_context) {
+ // TODO: Add timers for time spent in this function and queue time in 'batch_queue_'.
+ const RowBatchHeaderPB& header = request->row_batch_header();
+ kudu::Slice tuple_offsets;
+ kudu::Slice tuple_data;
+ int64_t batch_size;
+ Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
+ &batch_size);
+ if (UNLIKELY(!status.ok())) {
+ status.ToProto(response->mutable_status());
+ rpc_context->RespondSuccess();
+ return;
+ }
+
+ {
+ unique_lock<SpinLock> l(lock_);
+ // There should be one or more senders left when this function is called. The reason
+ // is that EndDataStream RPC is not sent until all outstanding TransmitData() RPC has
+ // been replied to. There is at least one TransmitData() RPC which hasn't yet been
+ // responded to if we reach here.
+ DCHECK_GT(num_remaining_senders_, 0);
+ if (UNLIKELY(is_cancelled_)) {
+ Status::OK().ToProto(response->mutable_status());
+ rpc_context->RespondSuccess();
+ return;
+ }
+
+ // If there's something in the queue and this batch will push us over the buffer
+ // limit we need to wait until the queue gets drained. We store the rpc context
+ // so that we can signal it at a later time to resend the batch that we couldn't
+ // process here. If there are already deferred RPCs waiting in queue, the new
+ // batch needs to line up after the deferred RPCs to avoid starvation of senders
+ // in the non-merging case.
+ if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
+ auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
+ deferred_rpcs_.push(move(payload));
+ COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+ return;
+ }
+
+ // At this point, we are committed to inserting the row batch into 'batch_queue_'.
+ AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+ }
+
+ // Respond to the sender to ack the insertion of the row batches.
+ Status::OK().ToProto(response->mutable_status());
+ rpc_context->RespondSuccess();
+}
+
+void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+ // Owns the first entry of 'deferred_rpcs_' if it ends up being popped.
+ std::unique_ptr<TransmitDataCtx> ctx;
+ {
+ unique_lock<SpinLock> l(lock_);
+ DCHECK_GT(num_deserialize_tasks_pending_, 0);
+ --num_deserialize_tasks_pending_;
+
+ // Returns if the queue has been cancelled or if it's empty.
+ if (UNLIKELY(is_cancelled_) || deferred_rpcs_.empty()) return;
+
+ // Try enqueuing the first entry into 'batch_queue_'.
+ ctx.swap(deferred_rpcs_.front());
+ kudu::Slice tuple_offsets;
+ kudu::Slice tuple_data;
+ int64_t batch_size;
+ Status status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
+ &tuple_data, &batch_size);
+ // Reply with error status if the entry cannot be unpacked.
+ if (UNLIKELY(!status.ok())) {
+ status.ToProto(ctx->response->mutable_status());
+ ctx->rpc_context->RespondSuccess();
+ deferred_rpcs_.pop();
+ return;
+ }
+
+ // Stops if inserting the batch causes us to go over the limit.
+ // Put 'ctx' back on the queue.
+ if (!CanEnqueue(batch_size)) {
+ ctx.swap(deferred_rpcs_.front());
+ DCHECK(deferred_rpcs_.front().get() != nullptr);
+ return;
+ }
+
+ // Dequeues the deferred batch and adds it to 'batch_queue_'.
+ deferred_rpcs_.pop();
+ const RowBatchHeaderPB& header = ctx->request->row_batch_header();
+ AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+ }
+
+ // Responds to the sender to ack the insertion of the row batches.
+ Status::OK().ToProto(ctx->response->mutable_status());
+ ctx->rpc_context->RespondSuccess();
+}
+
+void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
+ unique_ptr<TransmitDataCtx> ctx) {
+ int sender_id = ctx->request->sender_id();
+ COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+ {
+ lock_guard<SpinLock> l(lock_);
+ deferred_rpcs_.push(move(ctx));
+ ++num_deserialize_tasks_pending_;
+ }
+ recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
+ recvr_->dest_node_id(), sender_id, 1);
+}
+
+void KrpcDataStreamRecvr::SenderQueue::DecrementSenders() {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK_GT(num_remaining_senders_, 0);
+ num_remaining_senders_ = max(0, num_remaining_senders_ - 1);
+ VLOG_FILE << "decremented senders: fragment_instance_id="
+ << recvr_->fragment_instance_id()
+ << " node_id=" << recvr_->dest_node_id()
+ << " #senders=" << num_remaining_senders_;
+ if (num_remaining_senders_ == 0) data_arrival_cv_.notify_one();
+}
+
+void KrpcDataStreamRecvr::SenderQueue::Cancel() {
+ {
+ lock_guard<SpinLock> l(lock_);
+ if (is_cancelled_) return;
+ is_cancelled_ = true;
+
+ // Respond to deferred RPCs.
+ while (!deferred_rpcs_.empty()) {
+ const unique_ptr<TransmitDataCtx>& payload = deferred_rpcs_.front();
+ Status::OK().ToProto(payload->response->mutable_status());
+ payload->rpc_context->RespondSuccess();
+ deferred_rpcs_.pop();
+ }
+ }
+ VLOG_QUERY << "cancelled stream: fragment_instance_id_="
+ << recvr_->fragment_instance_id()
+ << " node_id=" << recvr_->dest_node_id();
+ // Wake up all threads waiting to produce/consume batches. They will all
+ // notice that the stream is cancelled and handle it.
+ data_arrival_cv_.notify_all();
+ PeriodicCounterUpdater::StopTimeSeriesCounter(
+ recvr_->bytes_received_time_series_counter_);
+}
+
+void KrpcDataStreamRecvr::SenderQueue::Close() {
+ unique_lock<SpinLock> l(lock_);
+ // Note that the queue must be cancelled first before it can be closed or we may
+ // risk running into a race which can leak row batches. Please see IMPALA-3034.
+ DCHECK(is_cancelled_);
+
+ // Wait for any pending insertion to complete first.
+ while (num_pending_enqueue_ > 0) data_arrival_cv_.wait(l);
+
+ // Delete any batches queued in batch_queue_
+ batch_queue_.clear();
+ current_batch_.reset();
+}
+
+Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
+ DCHECK(is_merging_);
+ vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers;
+ input_batch_suppliers.reserve(sender_queues_.size());
+
+ // Create the merger that will a single stream of sorted rows.
+ merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false));
+
+ for (SenderQueue* queue: sender_queues_) {
+ input_batch_suppliers.push_back(
+ [queue](RowBatch** next_batch) -> Status {
+ return queue->GetBatch(next_batch);
+ });
+ }
+
+ RETURN_IF_ERROR(merger_->Prepare(input_batch_suppliers));
+ return Status::OK();
}
-[[noreturn]] Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
- AbortUnsupportedFeature();
+void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
+ for (SenderQueue* sender_queue: sender_queues_) {
+ if (sender_queue->current_batch() != nullptr) {
+ sender_queue->current_batch()->TransferResourceOwnership(transfer_batch);
+ }
+ }
+}
+
+KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
+ MemTracker* parent_tracker, const RowDescriptor* row_desc,
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+ bool is_merging, int64_t total_buffer_limit, RuntimeProfile* profile)
+ : mgr_(stream_mgr),
+ fragment_instance_id_(fragment_instance_id),
+ dest_node_id_(dest_node_id),
+ total_buffer_limit_(total_buffer_limit),
+ row_desc_(row_desc),
+ is_merging_(is_merging),
+ num_buffered_bytes_(0),
+ profile_(profile),
+ recvr_side_profile_(profile_->CreateChild("RecvrSide")),
+ sender_side_profile_(profile_->CreateChild("SenderSide")) {
+ mem_tracker_.reset(new MemTracker(-1, "KrpcDataStreamRecvr", parent_tracker));
+ // Create one queue per sender if is_merging is true.
+ int num_queues = is_merging ? num_senders : 1;
+ sender_queues_.reserve(num_queues);
+ int num_sender_per_queue = is_merging ? 1 : num_senders;
+ for (int i = 0; i < num_queues; ++i) {
+ SenderQueue* queue =
+ sender_queue_pool_.Add(new SenderQueue(this, num_sender_per_queue));
+ sender_queues_.push_back(queue);
+ }
+
+ // Initialize the counters
+ bytes_received_counter_ =
+ ADD_COUNTER(recvr_side_profile_, "TotalBytesReceived", TUnit::BYTES);
+ bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+ recvr_side_profile_, "BytesReceived", bytes_received_counter_);
+ queue_get_batch_time_ = ADD_TIMER(recvr_side_profile_, "TotalGetBatchTime");
+ data_arrival_timer_ =
+ ADD_CHILD_TIMER(recvr_side_profile_, "DataArrivalTimer", "TotalGetBatchTime");
+ first_batch_wait_total_timer_ =
+ ADD_TIMER(recvr_side_profile_, "FirstBatchArrivalWaitTime");
+ deserialize_row_batch_timer_ =
+ ADD_TIMER(sender_side_profile_, "DeserializeRowBatchTime");
+ inactive_timer_ = profile_->inactive_timer();
+ num_early_senders_ =
+ ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
+ num_deferred_batches_ =
+ ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
+ num_accepted_batches_ =
+ ADD_COUNTER(sender_side_profile_, "NumBatchesAccepted", TUnit::UNIT);
}
-[[noreturn]] Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
- AbortUnsupportedFeature();
+Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
+ DCHECK(merger_.get() != nullptr);
+ return merger_->GetNext(output_batch, eos);
+}
+
+void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request,
+ TransmitDataResponsePB* response, RpcContext* rpc_context) {
+ int use_sender_id = is_merging_ ? request->sender_id() : 0;
+ // Add all batches to the same queue if is_merging_ is false.
+ sender_queues_[use_sender_id]->AddBatch(request, response, rpc_context);
+}
+
+void KrpcDataStreamRecvr::DequeueDeferredRpc(int sender_id) {
+ int use_sender_id = is_merging_ ? sender_id : 0;
+ // Add all batches to the same queue if is_merging_ is false.
+ sender_queues_[use_sender_id]->DequeueDeferredRpc();
+}
+
+void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
+ int use_sender_id = is_merging_ ? ctx->request->sender_id() : 0;
+ // Add all batches to the same queue if is_merging_ is false.
+ sender_queues_[use_sender_id]->TakeOverEarlySender(move(ctx));
+ COUNTER_ADD(num_early_senders_, 1);
+}
+
+void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
+ int use_sender_id = is_merging_ ? sender_id : 0;
+ sender_queues_[use_sender_id]->DecrementSenders();
+}
+
+void KrpcDataStreamRecvr::CancelStream() {
+ for (auto& queue: sender_queues_) queue->Cancel();
+}
+
+void KrpcDataStreamRecvr::Close() {
+ // Remove this receiver from the KrpcDataStreamMgr that created it.
+ // All the sender queues will be cancelled after this call returns.
+ const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id());
+ if (!status.ok()) {
+ LOG(ERROR) << "Error deregistering receiver: " << status.GetDetail();
+ }
+ mgr_ = nullptr;
+ for (auto& queue: sender_queues_) queue->Close();
+ merger_.reset();
+ mem_tracker_->Close();
+ recvr_side_profile_->StopPeriodicCounters();
+}
+
+KrpcDataStreamRecvr::~KrpcDataStreamRecvr() {
+ DCHECK(mgr_ == nullptr) << "Must call Close()";
}
-[[noreturn]] void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
- AbortUnsupportedFeature();
+Status KrpcDataStreamRecvr::GetBatch(RowBatch** next_batch) {
+ DCHECK(!is_merging_);
+ DCHECK_EQ(sender_queues_.size(), 1);
+ return sender_queues_[0]->GetBatch(next_batch);
}
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index be46ae3..8bd99cf 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -20,28 +20,207 @@
#include "data-stream-recvr-base.h"
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/object-pool.h"
#include "common/status.h"
+#include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/descriptors.h"
+#include "util/tuple-row-compare.h"
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
namespace impala {
+class KrpcDataStreamMgr;
+class MemTracker;
class RowBatch;
-class TupleRowComparator;
+class RuntimeProfile;
+class SortedRunMerger;
+class TransmitDataCtx;
+class TransmitDataRequestPB;
+class TransmitDataResponsePB;
-/// TODO: Stub for the KRPC version of the DataStreamRecvr. Fill with actual
-/// implementation.
+/// Single receiver of an m:n data stream.
+///
+/// KrpcDataStreamRecvr maintains one or more queues of row batches received by a
+/// KrpcDataStreamMgr from one or more sender fragment instances. Receivers are created
+/// via KrpcDataStreamMgr::CreateRecvr(). Ownership of a stream recvr is shared between
+/// the KrpcDataStreamMgr that created it and the caller of
+/// KrpcDataStreamMgr::CreateRecvr() (i.e. the exchange node).
+///
+/// The is_merging_ member determines if the recvr merges input streams from different
+/// sender fragment instances according to a specified sort order.
+/// If is_merging_ is false : Only one batch queue is maintained for row batches from all
+/// sender fragment instances. These row batches are returned one at a time via GetBatch()
+/// If is_merging_ is true : One queue is created for the batches from each distinct
+/// sender. A SortedRunMerger instance must be created via CreateMerger() prior to
+/// retrieving any rows from the receiver. Rows are retrieved from the receiver via
+/// GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to
+/// GetNext(), TransferAllResources() must be called to transfer resources from the input
+/// batches from each sender to the caller's output batch.
+/// The receiver sets deep_copy to false on the merger - resources are transferred from
+/// the input batches from each sender queue to the merger to the output batch by the
+/// merger itself as it processes each run.
+///
+/// KrpcDataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove
+/// the recvr instance from the tracking structure of its KrpcDataStreamMgr in all cases.
class KrpcDataStreamRecvr : public DataStreamRecvrBase {
public:
- [[noreturn]] KrpcDataStreamRecvr();
- virtual ~KrpcDataStreamRecvr() override;
+ ~KrpcDataStreamRecvr();
+
+ /// Returns next row batch in data stream; blocks if there aren't any.
+ /// Retains ownership of the returned batch. The caller must call TransferAllResources()
+ /// to acquire the resources from the returned batch before the next call to GetBatch().
+ /// A NULL returned batch indicated eos. Must only be called if is_merging_ is false.
+ /// TODO: This is currently only exposed to the non-merging version of the exchange.
+ /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos).
+ Status GetBatch(RowBatch** next_batch);
+
+ /// Deregister from KrpcDataStreamMgr instance, which shares ownership of this instance.
+ void Close();
+
+ /// Create a SortedRunMerger instance to merge rows from multiple sender according to
+ /// the specified row comparator. Fetches the first batches from the individual sender
+ /// queues. The exprs used in less_than must have already been prepared and opened.
+ Status CreateMerger(const TupleRowComparator& less_than);
+
+ /// Fill output_batch with the next batch of rows obtained by merging the per-sender
+ /// input streams. Must only be called if is_merging_ is true.
+ Status GetNext(RowBatch* output_batch, bool* eos);
+
+ /// Transfer all resources from the current batches being processed from each sender
+ /// queue to the specified batch.
+ void TransferAllResources(RowBatch* transfer_batch);
+
+ const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
+ PlanNodeId dest_node_id() const { return dest_node_id_; }
+ const RowDescriptor* row_desc() const { return row_desc_; }
+ MemTracker* mem_tracker() const { return mem_tracker_.get(); }
+
+ private:
+ friend class KrpcDataStreamMgr;
+ class SenderQueue;
+
+ KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, MemTracker* parent_tracker,
+ const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
+ PlanNodeId dest_node_id, int num_senders, bool is_merging,
+ int64_t total_buffer_limit, RuntimeProfile* profile);
+
+ /// Adds a new row batch to the appropriate sender queue. If the row batch can be
+ /// inserted, the RPC will be responded to before this function returns. If the batch
+ /// can't be added without exceeding the buffer limit, it is appended to a queue for
+ /// deferred processing. The RPC will be responded to when the row batch is deserialized
+ /// later.
+ void AddBatch(const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+ kudu::rpc::RpcContext* context);
+
+ /// Tries adding the first entry of 'deferred_rpcs_' queue for the sender queue
+ /// identified by 'sender_id'. If is_merging_ is false, it always defaults to
+ /// queue 0; If is_merging_ is true, the sender queue is identified by 'sender_id_'.
+ void DequeueDeferredRpc(int sender_id);
+
+ /// Takes over the RPC state 'ctx' of an early sender for deferred processing and
+ /// kicks off a deserialization task to process it asynchronously. This makes sure
+ /// new incoming RPCs won't pass the early senders, leading to starvation.
+ void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx);
+
+ /// Indicate that a particular sender is done. Delegated to the appropriate
+ /// sender queue. Called from KrpcDataStreamMgr.
+ void RemoveSender(int sender_id);
+
+ /// Marks all sender queues as cancelled and notifies all waiting consumers of
+ /// cancellation.
+ void CancelStream();
+
+ /// Return true if the addition of a new batch of size 'batch_size' would exceed the
+ /// total buffer limit.
+ bool ExceedsLimit(int64_t batch_size) {
+ return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
+ }
+
+ /// KrpcDataStreamMgr instance used to create this recvr. (Not owned)
+ KrpcDataStreamMgr* mgr_;
+
+ /// Fragment and node id of the destination exchange node this receiver is used by.
+ TUniqueId fragment_instance_id_;
+ PlanNodeId dest_node_id_;
+
+ /// Soft upper limit on the total amount of buffering in bytes allowed for this stream
+ /// across all sender queues. We defer processing of incoming RPCs once the amount of
+ /// buffered data exceeds this value.
+ const int64_t total_buffer_limit_;
+
+ /// Row schema.
+ const RowDescriptor* row_desc_;
+
+ /// True if this reciver merges incoming rows from different senders. Per-sender
+ /// row batch queues are maintained in this case.
+ bool is_merging_;
+
+ /// total number of bytes held across all sender queues.
+ AtomicInt32 num_buffered_bytes_;
+
+ /// Memtracker for batches in the sender queue(s).
+ boost::scoped_ptr<MemTracker> mem_tracker_;
+
+ /// One or more queues of row batches received from senders. If is_merging_ is true,
+ /// there is one SenderQueue for each sender. Otherwise, row batches from all senders
+ /// are placed in the same SenderQueue. The SenderQueue instances are owned by the
+ /// receiver and placed in sender_queue_pool_.
+ std::vector<SenderQueue*> sender_queues_;
+
+ /// SortedRunMerger used to merge rows from different senders.
+ boost::scoped_ptr<SortedRunMerger> merger_;
+
+ /// Pool of sender queues.
+ ObjectPool sender_queue_pool_;
+
+ /// Runtime profile storing the counters below.
+ RuntimeProfile* profile_;
+
+ /// Maintain two child profiles - receiver side measurements (from the GetBatch() path),
+ /// and sender side measurements (from AddBatch()).
+ RuntimeProfile* recvr_side_profile_;
+ RuntimeProfile* sender_side_profile_;
+
+ /// Number of bytes received.
+ RuntimeProfile::Counter* bytes_received_counter_;
+
+ /// Time series of number of bytes received, samples bytes_received_counter_
+ RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
+
+ /// Total wall-clock time spent deserializing row batches.
+ RuntimeProfile::Counter* deserialize_row_batch_timer_;
+
+ /// Number of senders which arrive before the receiver is ready.
+ RuntimeProfile::Counter* num_early_senders_;
+
+ /// Time spent waiting until the first batch arrives across all queues.
+ /// TODO: Turn this into a wall-clock timer.
+ RuntimeProfile::Counter* first_batch_wait_total_timer_;
+
+ /// Total number of batches received and deferred as sender queue is full.
+ RuntimeProfile::Counter* num_deferred_batches_;
+
+ /// Total number of batches received and accepted into the sender queue.
+ RuntimeProfile::Counter* num_accepted_batches_;
+
+ /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
+ RuntimeProfile::Counter* data_arrival_timer_;
- [[noreturn]] Status GetBatch(RowBatch** next_batch) override;
- [[noreturn]] void Close() override;
- [[noreturn]] Status CreateMerger(const TupleRowComparator& less_than) override;
- [[noreturn]] Status GetNext(RowBatch* output_batch, bool* eos) override;
- [[noreturn]] void TransferAllResources(RowBatch* transfer_batch) override;
+ /// Pointer to profile's inactive timer.
+ RuntimeProfile::Counter* inactive_timer_;
+ /// Total time spent in SenderQueue::GetBatch().
+ RuntimeProfile::Counter* queue_get_batch_time_;
};
} // namespace impala
-#endif /* IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H */
+#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
new file mode 100644
index 0000000..32e20cd
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -0,0 +1,754 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/krpc-data-stream-sender.h"
+
+#include <boost/bind.hpp>
+
+#include <chrono>
+#include <condition_variable>
+#include <iostream>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "common/logging.h"
+#include "exec/kudu-util.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
+#include "gutil/strings/substitute.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "rpc/rpc-mgr.inline.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/aligned-new.h"
+#include "util/debug-util.h"
+#include "util/network-util.h"
+
+#include "gen-cpp/data_stream_service.pb.h"
+#include "gen-cpp/data_stream_service.proxy.h"
+#include "gen-cpp/Types_types.h"
+
+#include "common/names.h"
+
+using std::condition_variable_any;
+using namespace apache::thrift;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
+using kudu::MonoDelta;
+
+DECLARE_int32(rpc_retry_interval_ms);
+
+namespace impala {
+
+// A datastream sender may send row batches to multiple destinations. There is one
+// channel for each destination.
+//
+// Clients can call TransmitData() to directly send a serialized row batch to the
+// destination or it can call AddRow() to accumulate rows in an internal row batch
+// to certain capacity before sending it. The underlying RPC layer is implemented
+// with KRPC, which provides interfaces for asynchronous RPC calls. Normally, the
+// calls above will return before the RPC has completed but they may block if there
+// is already an in-flight RPC.
+//
+// Each channel internally has two OutboundRowBatch to serialize to. They are reused
+// across multiple RPC calls. Having two OutboundRowBatch allows client to serialize
+// the next row batch while the current row batch is being sent. Upon completion of
+// a RPC, the callback TransmitDataCompleteCb() is invoked. If the RPC fails due to
+// remote service's queue being full, TransmitDataCompleteCb() will schedule the retry
+// callback RetryCb() after some delay dervied from 'FLAGS_rpc_retry_internal_ms'.
+//
+// When a data stream sender is shut down, it will call Teardown() on all channels to
+// release resources. Teardown() will cancel any in-flight RPC and wait for the
+// completion callback to be called before returning. It's expected that the execution
+// thread to call FlushAndSendEos() before closing the data stream sender to flush all
+// buffered row batches and send the end-of-stream message to the remote receiver.
+// Note that the RPC payloads are owned solely by the channel and the KRPC layer will
+// relinquish references of them before the completion callback is invoked so it's
+// safe to free them once the callback has been invoked.
+//
+// Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client
+// has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() should be the
+// right place, except that's currently called too early). RpcController::Cancel() ensures
+// that the callback is called only after the RPC layer no longer references the sidecar
+// buffers.
+class KrpcDataStreamSender::Channel : public CacheLineAligned {
+ public:
+ // Creates a channel to send data to particular ipaddress/port/fragment instance id/node
+ // combination. buffer_size is specified in bytes and a soft limit on how much tuple
+ // data is getting accumulated before being sent; it only applies when data is added via
+ // AddRow() and not sent directly via SendBatch().
+ Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc,
+ const TNetworkAddress& destination, const TUniqueId& fragment_instance_id,
+ PlanNodeId dest_node_id, int buffer_size)
+ : parent_(parent),
+ row_desc_(row_desc),
+ address_(destination),
+ fragment_instance_id_(fragment_instance_id),
+ dest_node_id_(dest_node_id) {
+ DCHECK(IsResolvedAddress(address_));
+ }
+
+ // Initializes the channel.
+ // Returns OK if successful, error indication otherwise.
+ Status Init(RuntimeState* state);
+
+ // Serializes the given row batch and send it to the destination. If the preceding
+ // RPC is in progress, this function may block until the previous RPC finishes.
+ // Return error status if serialization or the preceding RPC failed. Return OK
+ // otherwise.
+ Status SerializeAndSendBatch(RowBatch* batch);
+
+ // Transmits the serialized row batch 'outbound_batch'. This function may block if the
+ // preceding RPC is still in-flight. This is expected to be called from the fragment
+ // instance execution thread. Return error status if initialization of the RPC request
+ // parameters failed or if the preceding RPC failed. Returns OK otherwise.
+ Status TransmitData(const OutboundRowBatch* outbound_batch);
+
+ // Copies a single row into this channel's row batch and flushes the row batch once
+ // it reaches capacity. This call may block if the row batch's capacity is reached
+ // and the preceding RPC is still in progress. Returns error status if serialization
+ // failed or if the preceding RPC failed. Return OK otherwise.
+ Status AddRow(TupleRow* row);
+
+ // Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will
+ // be cancelled. It's expected that clients normally call FlushAndSendEos() before
+ // calling Teardown() to flush all buffered row batches to destinations. Teardown()
+ // may be called without FlushAndSendEos() in cases such as cancellation or error.
+ void Teardown(RuntimeState* state);
+
+ // Flushes any buffered row batches and sends the EOS RPC to close the channel.
+ // Return error status if either the last TransmitData() RPC or EOS RPC failed.
+ // This function blocks until the EOS RPC is complete.
+ Status FlushAndSendEos(RuntimeState* state);
+
+ int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
+
+ // The type for a RPC worker function.
+ typedef boost::function<Status()> DoRpcFn;
+
+ private:
+ // The parent data stream sender owning this channel. Not owned.
+ KrpcDataStreamSender* parent_;
+
+ // The descriptor of the accumulated rows in 'batch_' below. Used for computing
+ // the capacity of 'batch_' and also when adding a row in AddRow().
+ const RowDescriptor* row_desc_;
+
+ // The triplet of IP-address:port/finst-id/node-id uniquely identifies the receiver.
+ const TNetworkAddress address_;
+ const TUniqueId fragment_instance_id_;
+ const PlanNodeId dest_node_id_;
+
+ // Number of bytes of all serialized row batches sent successfully.
+ int64_t num_data_bytes_sent_ = 0;
+
+ // The row batch for accumulating rows copied from AddRow().
+ // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
+ scoped_ptr<RowBatch> batch_;
+
+ // The outbound row batches are double-buffered so that we can serialize the next
+ // batch while the other is still referenced by the in-flight RPC. Each entry contains
+ // a RowBatchHeaderPB and the buffers for the serialized tuple offsets and data.
+ //
+ // TODO: replace this with an actual queue. Schedule another RPC callback in the
+ // completion callback if the queue is not empty.
+ // TODO: rethink whether to keep per-channel buffers vs having all buffers in the
+ // datastream sender and sharing them across all channels. These buffers are not used in
+ // "UNPARTITIONED" scheme.
+ OutboundRowBatch outbound_batches_[NUM_OUTBOUND_BATCHES];
+
+ // Index into 'outbound_batches_' for the next available OutboundRowBatch to serialize
+ // into. This is read and written by the main execution thread.
+ int next_batch_idx_ = 0;
+
+ // Synchronize accesses to the following fields between the main execution thread and
+ // the KRPC reactor thread. Note that there should be only one reactor thread invoking
+ // the callbacks for a channel so there should be no races between multiple reactor
+ // threads. Protect all subsequent fields.
+ SpinLock lock_;
+
+ // 'lock_' needs to be held when accessing the following fields.
+ // The client interface for making RPC calls to the remote DataStreamService.
+ std::unique_ptr<DataStreamServiceProxy> proxy_;
+
+ // Controller for managing properties of a single RPC call (such as features required
+ // in the remote servers) and passing the payloads to the actual OutboundCall object.
+ RpcController rpc_controller_;
+
+ // Protobuf response buffer for TransmitData() RPC.
+ TransmitDataResponsePB resp_;
+
+ // Protobuf response buffer for EndDataStream() RPC.
+ EndDataStreamResponsePB eos_resp_;
+
+ // Signaled when the in-flight RPC completes.
+ condition_variable_any rpc_done_cv_;
+
+ // Status of the most recently completed RPC.
+ Status rpc_status_;
+
+ // The pointer to the current serialized row batch being sent.
+ const OutboundRowBatch* rpc_in_flight_batch_ = nullptr;
+
+ // True if there is an in-flight RPC.
+ bool rpc_in_flight_ = false;
+
+ // True if the channel is being shut down or shut down already.
+ bool shutdown_ = false;
+
+ // True if the remote receiver is closed already. In which case, all rows would
+ // be dropped silently.
+ // TODO: Fix IMPALA-3990
+ bool remote_recvr_closed_ = false;
+
+ // Returns true if the channel should terminate because the parent sender
+ // has been closed or cancelled.
+ bool ShouldTerminate() const { return shutdown_ || parent_->state_->is_cancelled(); }
+
+ // Send the rows accumulated in the internal row batch. This will serialize the
+ // internal row batch before sending them to the destination. This may block if
+ // the preceding RPC is still in progress. Returns error status if serialization
+ // fails or if the preceding RPC fails.
+ Status SendCurrentBatch();
+
+ // Called when an RPC failed. If it turns out that the RPC failed because the
+ // remote server is too busy, this function will schedule RetryCb() to be called
+ // after FLAGS_rpc_retry_interval_ms milliseconds, which in turn re-invokes the RPC.
+ // Otherwise, it will call MarkDone() to mark the RPC as done and failed.
+ // 'controller_status' is a Kudu status returned from the KRPC layer.
+ // 'rpc_fn' is a worker function which initializes the RPC parameters and invokes
+ // the actual RPC when the RPC is rescheduled.
+ // 'err_msg' is an error message to be prepended to the status converted from the
+ // Kudu status 'controller_status'.
+ void HandleFailedRPC(const DoRpcFn& rpc_fn, const kudu::Status& controller_status,
+ const string& err_msg);
+
+ // Waits for the preceding RPC to complete. Expects to be called with 'lock_' held.
+ // May drop the lock while waiting for the RPC to complete. Return error status if
+ // the preceding RPC fails. Returns CANCELLED if the parent sender is cancelled or
+ // shut down. Returns OK otherwise. This should be only called from a fragment
+ // executor thread.
+ Status WaitForRpc(std::unique_lock<SpinLock>* lock);
+
+ // A callback function called from KRPC reactor thread to retry an RPC which failed
+ // previously due to remote server being too busy. This will re-arm the request
+ // parameters of the RPC. The retry may not happen if the callback has been aborted
+ // internally by KRPC code (e.g. the reactor thread was being shut down) or if the
+ // parent sender has been cancelled or closed since the scheduling of this callback.
+ // In which case, MarkDone() will be called with the error status and the RPC is
+ // considered complete. 'status' is the error status passed by KRPC code in case the
+ // callback was aborted.
+ void RetryCb(DoRpcFn rpc_fn, const kudu::Status& status);
+
+ // A callback function called from KRPC reactor threads upon completion of an in-flight
+ // TransmitData() RPC. This is called when the remote server responds to the RPC or
+ // when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a
+ // successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the
+ // response. HandleFailedRPC() is called to handle failed KRPC call. The RPC may be
+ // rescheduled if it's due to remote server being too busy.
+ void TransmitDataCompleteCb();
+
+ // Initializes the parameters for TransmitData() RPC and invokes the async RPC call.
+ // It will add 'tuple_offsets_' and 'tuple_data_' in 'rpc_in_flight_batch_' as sidecars
+ // to the RpcController and store the sidecars' indices to TransmitDataRequestPB sent as
+ // part of the RPC. Returns error status if adding sidecars to the RpcController failed.
+ Status DoTransmitDataRpc();
+
+ // A callback function called from KRPC reactor threads upon completion of an in-flight
+ // EndDataStream() RPC. This is called when the remote server responds to the RPC or
+ // when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a
+ // successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the
+ // response. HandleFailedRPC() is called to handle failed KRPC calls. The RPC may be
+ // rescheduled if it's due to remote server being too busy.
+ void EndDataStreamCompleteCb();
+
+ // Initializes the parameters for EndDataStream() RPC and invokes the async RPC call.
+ Status DoEndDataStreamRpc();
+
+ // Marks the in-flight RPC as completed, updates 'rpc_status_' with the status of the
+ // RPC (indicated in parameter 'status') and notifies any thread waiting for RPC
+ // completion. Expects to be called with 'lock_' held. Called in the context of a
+ // reactor thread.
+ void MarkDone(const Status& status);
+};
+
+Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
+ // TODO: take into account of var-len data at runtime.
+ int capacity =
+ max(1, parent_->per_channel_buffer_size_ / max(row_desc_->GetRowSize(), 1));
+ batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
+
+ // Create a DataStreamService proxy to the destination.
+ RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
+ RETURN_IF_ERROR(rpc_mgr->GetProxy(address_, &proxy_));
+ return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
+ rpc_status_ = status;
+ rpc_in_flight_ = false;
+ rpc_in_flight_batch_ = nullptr;
+ rpc_done_cv_.notify_one();
+}
+
+Status KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* lock) {
+ DCHECK(lock != nullptr);
+ DCHECK(lock->owns_lock());
+
+ SCOPED_TIMER(parent_->state_->total_network_send_timer());
+
+ // Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled.
+ while(rpc_in_flight_ && !ShouldTerminate()) {
+ rpc_done_cv_.wait_for(*lock, std::chrono::milliseconds(50));
+ }
+
+ if (UNLIKELY(ShouldTerminate())) {
+ // DSS is single-threaded so it's impossible for shutdown_ to be true here.
+ DCHECK(!shutdown_);
+ return Status::CANCELLED;
+ }
+
+ DCHECK(!rpc_in_flight_);
+ if (UNLIKELY(!rpc_status_.ok())) {
+ LOG(ERROR) << "channel send status: " << rpc_status_.GetDetail();
+ return rpc_status_;
+ }
+ return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::RetryCb(
+ DoRpcFn rpc_fn, const kudu::Status& cb_status) {
+ COUNTER_ADD(parent_->rpc_retry_counter_, 1);
+ std::unique_lock<SpinLock> l(lock_);
+ DCHECK(rpc_in_flight_);
+ // Aborted by KRPC layer as reactor thread was being shut down.
+ if (UNLIKELY(!cb_status.ok())) {
+ MarkDone(FromKuduStatus(cb_status, "KRPC retry failed"));
+ return;
+ }
+ // Parent datastream sender has been closed or cancelled.
+ if (UNLIKELY(ShouldTerminate())) {
+ MarkDone(Status::CANCELLED);
+ return;
+ }
+ // Retry the RPC.
+ Status status = rpc_fn();
+ if (UNLIKELY(!status.ok())) {
+ MarkDone(status);
+ }
+}
+
+void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
+ const kudu::Status& controller_status, const string& prepend) {
+ // Retrying later if the destination is busy. We don't call ShouldTerminate()
+ // here as this is always checked in RetryCb() anyway.
+ // TODO: IMPALA-6159. Handle 'connection reset by peer' due to stale connections.
+ if (RpcMgr::IsServerTooBusy(rpc_controller_)) {
+ RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
+ // RetryCb() is scheduled to be called in a reactor context.
+ rpc_mgr->messenger()->ScheduleOnReactor(
+ boost::bind(&KrpcDataStreamSender::Channel::RetryCb, this, rpc_fn, _1),
+ MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms));
+ return;
+ }
+ MarkDone(FromKuduStatus(controller_status, prepend));
+}
+
+void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
+ std::unique_lock<SpinLock> l(lock_);
+ DCHECK(rpc_in_flight_);
+ const kudu::Status controller_status = rpc_controller_.status();
+ if (LIKELY(controller_status.ok())) {
+ Status rpc_status = Status::OK();
+ int32_t status_code = resp_.status().status_code();
+ if (LIKELY(status_code == TErrorCode::OK)) {
+ DCHECK(rpc_in_flight_batch_ != nullptr);
+ num_data_bytes_sent_ += RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
+ VLOG_ROW << "incremented #data_bytes_sent=" << num_data_bytes_sent_;
+ } else if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
+ remote_recvr_closed_ = true;
+ } else {
+ rpc_status = Status(resp_.status());
+ }
+ MarkDone(rpc_status);
+ } else {
+ DoRpcFn rpc_fn =
+ boost::bind(&KrpcDataStreamSender::Channel::DoTransmitDataRpc, this);
+ const string& prepend =
+ Substitute("TransmitData() to $0 failed", TNetworkAddressToString(address_));
+ HandleFailedRPC(rpc_fn, controller_status, prepend);
+ }
+}
+
+Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
+ DCHECK(rpc_in_flight_batch_ != nullptr);
+ DCHECK(rpc_in_flight_batch_->IsInitialized());
+
+ // Initialize some constant fields in the request protobuf.
+ TransmitDataRequestPB req;
+ UniqueIdPB* finstance_id_pb = req.mutable_dest_fragment_instance_id();
+ finstance_id_pb->set_lo(fragment_instance_id_.lo);
+ finstance_id_pb->set_hi(fragment_instance_id_.hi);
+ req.set_sender_id(parent_->sender_id_);
+ req.set_dest_node_id(dest_node_id_);
+
+ // Set the RowBatchHeader in the request.
+ req.set_allocated_row_batch_header(
+ const_cast<RowBatchHeaderPB*>(rpc_in_flight_batch_->header()));
+
+ rpc_controller_.Reset();
+ int sidecar_idx;
+ // Add 'tuple_offsets_' as sidecar.
+ KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(RpcSidecar::FromSlice(
+ rpc_in_flight_batch_->TupleOffsetsAsSlice()), &sidecar_idx),
+ "Unable to add tuple offsets to sidecar");
+ req.set_tuple_offsets_sidecar_idx(sidecar_idx);
+
+ // Add 'tuple_data_' as sidecar.
+ KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(
+ RpcSidecar::FromSlice(rpc_in_flight_batch_->TupleDataAsSlice()), &sidecar_idx),
+ "Unable to add tuple data to sidecar");
+ req.set_tuple_data_sidecar_idx(sidecar_idx);
+
+ resp_.Clear();
+ proxy_->TransmitDataAsync(req, &resp_, &rpc_controller_,
+ boost::bind(&KrpcDataStreamSender::Channel::TransmitDataCompleteCb, this));
+ // 'req' took ownership of 'header'. Need to release its ownership or 'header' will be
+ // deleted by destructor.
+ req.release_row_batch_header();
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::TransmitData(
+ const OutboundRowBatch* outbound_batch) {
+ VLOG_ROW << "Channel::TransmitData() finst_id=" << fragment_instance_id_
+ << " dest_node=" << dest_node_id_
+ << " #rows=" << outbound_batch->header()->num_rows();
+ std::unique_lock<SpinLock> l(lock_);
+ RETURN_IF_ERROR(WaitForRpc(&l));
+ DCHECK(!rpc_in_flight_);
+ DCHECK(rpc_in_flight_batch_ == nullptr);
+ // If the remote receiver is closed already, there is no point in sending anything.
+ // TODO: Needs better solution for IMPALA-3990 in the long run.
+ if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
+ rpc_in_flight_ = true;
+ rpc_in_flight_batch_ = outbound_batch;
+ RETURN_IF_ERROR(DoTransmitDataRpc());
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::SerializeAndSendBatch(RowBatch* batch) {
+ OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
+ DCHECK(outbound_batch != rpc_in_flight_batch_);
+ RETURN_IF_ERROR(parent_->SerializeBatch(batch, outbound_batch));
+ RETURN_IF_ERROR(TransmitData(outbound_batch));
+ next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
+ RETURN_IF_ERROR(SerializeAndSendBatch(batch_.get()));
+ batch_->Reset();
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
+ if (batch_->AtCapacity()) {
+ // batch_ is full, let's send it.
+ RETURN_IF_ERROR(SendCurrentBatch());
+ }
+ TupleRow* dest = batch_->GetRow(batch_->AddRow());
+ const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors();
+ for (int i = 0; i < descs.size(); ++i) {
+ if (UNLIKELY(row->GetTuple(i) == nullptr)) {
+ dest->SetTuple(i, nullptr);
+ } else {
+ dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i], batch_->tuple_data_pool()));
+ }
+ }
+ batch_->CommitLastRow();
+ return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() {
+ std::unique_lock<SpinLock> l(lock_);
+ DCHECK(rpc_in_flight_);
+ const kudu::Status controller_status = rpc_controller_.status();
+ if (LIKELY(controller_status.ok())) {
+ MarkDone(Status(eos_resp_.status()));
+ } else {
+ DoRpcFn rpc_fn =
+ boost::bind(&KrpcDataStreamSender::Channel::DoEndDataStreamRpc, this);
+ const string& prepend =
+ Substitute("EndDataStream() to $0 failed", TNetworkAddressToString(address_));
+ HandleFailedRPC(rpc_fn, controller_status, prepend);
+ }
+}
+
+Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() {
+ DCHECK(rpc_in_flight_);
+ EndDataStreamRequestPB eos_req;
+ rpc_controller_.Reset();
+ UniqueIdPB* finstance_id_pb = eos_req.mutable_dest_fragment_instance_id();
+ finstance_id_pb->set_lo(fragment_instance_id_.lo);
+ finstance_id_pb->set_hi(fragment_instance_id_.hi);
+ eos_req.set_sender_id(parent_->sender_id_);
+ eos_req.set_dest_node_id(dest_node_id_);
+ eos_resp_.Clear();
+ proxy_->EndDataStreamAsync(eos_req, &eos_resp_, &rpc_controller_,
+ boost::bind(&KrpcDataStreamSender::Channel::EndDataStreamCompleteCb, this));
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
+ VLOG_RPC << "Channel::FlushAndSendEos() instance_id=" << fragment_instance_id_
+ << " dest_node=" << dest_node_id_
+ << " #rows= " << batch_->num_rows();
+
+ // We can return an error here and not go on to send the EOS RPC because the error that
+ // we returned will be sent to the coordinator who will then cancel all the remote
+ // fragments including the one that this sender is sending to.
+ if (batch_->num_rows() > 0) RETURN_IF_ERROR(SendCurrentBatch());
+ {
+ std::unique_lock<SpinLock> l(lock_);
+ RETURN_IF_ERROR(WaitForRpc(&l));
+ DCHECK(!rpc_in_flight_);
+ if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
+ VLOG_RPC << "calling EndDataStream() to terminate channel.";
+ rpc_in_flight_ = true;
+ RETURN_IF_ERROR(DoEndDataStreamRpc());
+ RETURN_IF_ERROR(WaitForRpc(&l));
+ }
+ return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) {
+ // Normally, FlushAndSendEos() should have been called before calling Teardown(),
+ // which means that all the data should already be drained. If the fragment was
+ // was closed or cancelled, there may still be some in-flight RPCs and buffered
+ // row batches to be flushed.
+ std::unique_lock<SpinLock> l(lock_);
+ shutdown_ = true;
+ // Cancel any in-flight RPC.
+ if (rpc_in_flight_) {
+ rpc_controller_.Cancel();
+ while (rpc_in_flight_) rpc_done_cv_.wait(l);
+ }
+ batch_.reset();
+}
+
+KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* row_desc,
+ const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations,
+ int per_channel_buffer_size)
+ : DataSink(row_desc),
+ sender_id_(sender_id),
+ partition_type_(sink.output_partition.type),
+ per_channel_buffer_size_(per_channel_buffer_size),
+ dest_node_id_(sink.dest_node_id),
+ next_unknown_partition_(0) {
+ DCHECK_GT(destinations.size(), 0);
+ DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
+ || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
+ || sink.output_partition.type == TPartitionType::RANDOM
+ || sink.output_partition.type == TPartitionType::KUDU);
+
+ for (int i = 0; i < destinations.size(); ++i) {
+ channels_.push_back(
+ new Channel(this, row_desc, destinations[i].krpc_server,
+ destinations[i].fragment_instance_id, sink.dest_node_id,
+ per_channel_buffer_size));
+ }
+
+ if (partition_type_ == TPartitionType::UNPARTITIONED ||
+ partition_type_ == TPartitionType::RANDOM) {
+ // Randomize the order we open/transmit to channels to avoid thundering herd problems.
+ srand(reinterpret_cast<uint64_t>(this));
+ random_shuffle(channels_.begin(), channels_.end());
+ }
+}
+
+string KrpcDataStreamSender::GetName() {
+ return Substitute("KrpcDataStreamSender (dst_id=$0)", dest_node_id_);
+}
+
+KrpcDataStreamSender::~KrpcDataStreamSender() {
+ // TODO: check that sender was either already closed() or there was an error
+ // on some channel
+ for (int i = 0; i < channels_.size(); ++i) {
+ delete channels_[i];
+ }
+}
+
+Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
+ const TDataSink& tsink, RuntimeState* state) {
+ DCHECK(tsink.__isset.stream_sink);
+ if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
+ partition_type_ == TPartitionType::KUDU) {
+ RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
+ *row_desc_, state, &partition_exprs_));
+ }
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::Prepare(
+ RuntimeState* state, MemTracker* parent_mem_tracker) {
+ RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
+ state_ = state;
+ SCOPED_TIMER(profile_->total_time_counter());
+ RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state,
+ state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
+ &partition_expr_evals_));
+ serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
+ rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
+ bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+ uncompressed_bytes_counter_ =
+ ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
+ total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);
+ overall_throughput_ =
+ profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
+ bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
+ profile()->total_time_counter()));
+ for (int i = 0; i < channels_.size(); ++i) {
+ RETURN_IF_ERROR(channels_[i]->Init(state));
+ }
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::Open(RuntimeState* state) {
+ return ScalarExprEvaluator::Open(partition_expr_evals_, state);
+}
+
+Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
+ DCHECK(!closed_);
+ DCHECK(!flushed_);
+
+ if (batch->num_rows() == 0) return Status::OK();
+ if (partition_type_ == TPartitionType::UNPARTITIONED) {
+ OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
+ RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch));
+ // TransmitData() will block if there are still in-flight rpcs (and those will
+ // reference the previously written serialized batch).
+ for (int i = 0; i < channels_.size(); ++i) {
+ RETURN_IF_ERROR(channels_[i]->TransmitData(outbound_batch));
+ }
+ next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
+ } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 1) {
+ // Round-robin batches among channels. Wait for the current channel to finish its
+ // rpc before overwriting its batch.
+ Channel* current_channel = channels_[current_channel_idx_];
+ RETURN_IF_ERROR(current_channel->SerializeAndSendBatch(batch));
+ current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
+ } else if (partition_type_ == TPartitionType::KUDU) {
+ DCHECK_EQ(partition_expr_evals_.size(), 1);
+ int num_channels = channels_.size();
+ for (int i = 0; i < batch->num_rows(); ++i) {
+ TupleRow* row = batch->GetRow(i);
+ int32_t partition =
+ *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+ if (partition < 0) {
+ // This row doesn't correspond to a partition, e.g. it's outside the given ranges.
+ partition = next_unknown_partition_;
+ ++next_unknown_partition_;
+ }
+ RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
+ }
+ } else {
+ DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
+ // hash-partition batch's rows across channels
+ // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
+ // once we have codegen here.
+ int num_channels = channels_.size();
+ for (int i = 0; i < batch->num_rows(); ++i) {
+ TupleRow* row = batch->GetRow(i);
+ uint64_t hash_val = EXCHANGE_HASH_SEED;
+ for (int j = 0; j < partition_exprs_.size(); ++j) {
+ ScalarExprEvaluator* eval = partition_expr_evals_[j];
+ void* partition_val = eval->GetValue(row);
+ // We can't use the crc hash function here because it does not result in
+ // uncorrelated hashes with different seeds. Instead we use FastHash.
+ // TODO: fix crc hash/GetHashValue()
+ DCHECK(&(eval->root()) == partition_exprs_[j]);
+ hash_val = RawValue::GetHashValueFastHash(
+ partition_val, partition_exprs_[j]->type(), hash_val);
+ }
+ RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
+ }
+ }
+ COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
+ expr_results_pool_->Clear();
+ RETURN_IF_ERROR(state->CheckQueryState());
+ return Status::OK();
+}
+
+Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
+ DCHECK(!flushed_);
+ DCHECK(!closed_);
+ flushed_ = true;
+ for (int i = 0; i < channels_.size(); ++i) {
+ // If we hit an error here, we can return without closing the remaining channels as
+ // the error is propagated back to the coordinator, which in turn cancels the query,
+ // which will cause the remaining open channels to be closed.
+ RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state));
+ }
+ return Status::OK();
+}
+
+void KrpcDataStreamSender::Close(RuntimeState* state) {
+ if (closed_) return;
+ for (int i = 0; i < channels_.size(); ++i) {
+ channels_[i]->Teardown(state);
+ }
+ ScalarExprEvaluator::Close(partition_expr_evals_, state);
+ ScalarExpr::Close(partition_exprs_);
+ DataSink::Close(state);
+ closed_ = true;
+}
+
+Status KrpcDataStreamSender::SerializeBatch(
+ RowBatch* src, OutboundRowBatch* dest, int num_receivers) {
+ VLOG_ROW << "serializing " << src->num_rows() << " rows";
+ {
+ SCOPED_TIMER(profile_->total_time_counter());
+ SCOPED_TIMER(serialize_batch_timer_);
+ RETURN_IF_ERROR(src->Serialize(dest));
+ int64_t bytes = RowBatch::GetSerializedSize(*dest);
+ int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
+ COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
+ COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
+ }
+ return Status::OK();
+}
+
+int64_t KrpcDataStreamSender::GetNumDataBytesSent() const {
+ // TODO: do we need synchronization here or are reads & writes to 8-byte ints
+ // atomic?
+ int64_t result = 0;
+ for (int i = 0; i < channels_.size(); ++i) {
+ result += channels_[i]->num_data_bytes_sent();
+ }
+ return result;
+}
+
+} // namespace impala
+
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
new file mode 100644
index 0000000..1a8b30f
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H
+#define IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H
+
+#include <vector>
+#include <string>
+
+#include "exec/data-sink.h"
+#include "common/global-types.h"
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "runtime/row-batch.h"
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+class RowDescriptor;
+class MemTracker;
+class TDataStreamSink;
+class TNetworkAddress;
+class TPlanFragmentDestination;
+
+/// Single sender of an m:n data stream.
+///
+/// Row batch data is routed to destinations based on the provided partitioning
+/// specification.
+/// *Not* thread-safe.
+///
+/// TODO: capture stats that describe distribution of rows/data volume
+/// across channels.
+/// TODO: create a PlanNode equivalent class for DataSink.
+class KrpcDataStreamSender : public DataSink {
+ public:
+ /// Construct a sender according to the output specification (tsink), sending to the
+ /// given destinations:
+ /// 'sender_id' identifies this sender instance, and is unique within a fragment.
+ /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink.
+ /// 'destinations' are the receivers' network addresses. There is one channel for each
+ /// destination.
+ /// 'per_channel_buffer_size' is the soft limit in bytes of the buffering into the
+ /// per-channel's accumulating row batch before it will be sent.
+ /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED,
+ /// and RANDOM.
+ KrpcDataStreamSender(int sender_id, const RowDescriptor* row_desc,
+ const TDataStreamSink& tsink,
+ const std::vector<TPlanFragmentDestination>& destinations,
+ int per_channel_buffer_size);
+
+ virtual ~KrpcDataStreamSender();
+
+ virtual std::string GetName();
+
+ /// Initialize the sender by initializing all the channels and allocates all
+ /// the stat counters. Return error status if any channels failed to initialize.
+ virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
+
+ /// Initialize the evaluator of the partitioning expressions. Return error status
+ /// if initialization failed.
+ virtual Status Open(RuntimeState* state);
+
+ /// Flush all buffered data and close all existing channels to destination hosts.
+ /// Further Send() calls are illegal after calling FlushFinal(). It is legal to call
+ /// FlushFinal() no more than once. Return error status if Send() failed or the end
+ /// of stream call failed.
+ virtual Status FlushFinal(RuntimeState* state);
+
+ /// Send data in 'batch' to destination nodes according to partitioning
+ /// specification provided in c'tor.
+ /// Blocks until all rows in batch are placed in their appropriate outgoing
+ /// buffers (ie, blocks if there are still in-flight rpcs from the last
+ /// Send() call).
+ virtual Status Send(RuntimeState* state, RowBatch* batch);
+
+ /// Shutdown all existing channels to destination hosts. Further FlushFinal() calls are
+ /// illegal after calling Close().
+ virtual void Close(RuntimeState* state);
+
+ protected:
+ friend class DataStreamTest;
+
+ /// Initialize any partitioning expressions based on 'thrift_output_exprs' and stores
+ /// them in 'partition_exprs_'. Returns error status if the initialization failed.
+ virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+ const TDataSink& tsink, RuntimeState* state);
+
+ /// Return total number of bytes sent. If batches are broadcast to multiple receivers,
+ /// they are counted once per receiver.
+ int64_t GetNumDataBytesSent() const;
+
+ private:
+ class Channel;
+
+ /// Serializes the src batch into the serialized row batch 'dest' and updates
+ /// various stat counters.
+ /// 'num_receivers' is the number of receivers this batch will be sent to. Used for
+ /// updating the stat counters.
+ Status SerializeBatch(RowBatch* src, OutboundRowBatch* dest, int num_receivers = 1);
+
+ /// Sender instance id, unique within a fragment.
+ const int sender_id_;
+
+ /// The type of partitioning to perform.
+ const TPartitionType::type partition_type_;
+
+ /// Amount of per-channel buffering for rows before sending them to the destination.
+ const int per_channel_buffer_size_;
+
+ /// RuntimeState of the fragment instance.
+ RuntimeState* state_ = nullptr;
+
+ /// Index of the current channel to send to if random_ == true.
+ int current_channel_idx_ = 0;
+
+ /// Index of the next OutboundRowBatch to use for serialization.
+ int next_batch_idx_ = 0;
+
+ /// The outbound row batches are double-buffered so that we can serialize the next
+ /// batch while the other is still referenced by the in-flight RPC. Each entry contains
+ /// a RowBatchHeaderPB and buffers for the serialized tuple offsets and data. Used only
+ /// when the partitioning strategy is UNPARTITIONED.
+ static const int NUM_OUTBOUND_BATCHES = 2;
+ OutboundRowBatch outbound_batches_[NUM_OUTBOUND_BATCHES];
+
+ /// If true, this sender has called FlushFinal() successfully.
+ /// Not valid to call Send() anymore.
+ bool flushed_ = false;
+
+ /// If true, this sender has been closed. Not valid to call Send() anymore.
+ bool closed_ = false;
+
+ /// List of all channels. One for each destination.
+ std::vector<Channel*> channels_;
+
+ /// Expressions of partition keys. It's used to compute the
+ /// per-row partition values for shuffling exchange;
+ std::vector<ScalarExpr*> partition_exprs_;
+ std::vector<ScalarExprEvaluator*> partition_expr_evals_;
+
+ /// Time for serializing row batches.
+ RuntimeProfile::Counter* serialize_batch_timer_ = nullptr;
+
+ /// Number of TransmitData() RPC retries due to remote service being busy.
+ RuntimeProfile::Counter* rpc_retry_counter_ = nullptr;
+
+ /// Total number of bytes sent.
+ RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
+
+ /// Total number of bytes of the row batches before compression.
+ RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;
+
+ /// Total number of rows sent.
+ RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr;
+
+ /// Throughput per total time spent in sender
+ RuntimeProfile::Counter* overall_throughput_ = nullptr;
+
+ /// Identifier of the destination plan node.
+ PlanNodeId dest_node_id_;
+
+ /// Used for Kudu partitioning to round-robin rows that don't correspond to a partition
+ /// or when errors are encountered.
+ int next_unknown_partition_;
+
+ /// An arbitrary hash seed used for exchanges.
+ static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37;
+};
+
+} // namespace impala
+
+#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H