You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/11/09 22:55:37 UTC

[2/3] incubator-impala git commit: IMPALA-4856: Port data stream service to KRPC

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 0731d45..68f00e3 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -17,46 +17,596 @@
 
 #include "runtime/krpc-data-stream-recvr.h"
 
-#include "common/logging.h"
+#include <condition_variable>
+#include <queue>
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "runtime/krpc-data-stream-recvr.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/sorted-run-merger.h"
+#include "util/runtime-profile-counters.h"
+#include "util/periodic-counter-updater.h"
+
+#include "gen-cpp/data_stream_service.pb.h"
+
+#include "common/names.h"
 
 DECLARE_bool(use_krpc);
+DECLARE_int32(datastream_service_num_deserialization_threads);
+
+using kudu::rpc::RpcContext;
+using std::condition_variable_any;
 
 namespace impala {
 
-[[noreturn]] static void AbortUnsupportedFeature() {
-  // We should have gotten here only if the FLAGS_use_krpc is set to true.
-  CHECK(FLAGS_use_krpc) << "Shouldn't reach here unless startup flag 'use_krpc' "
-      "is true.";
-  // KRPC isn't supported yet, so abort.
-  ABORT_WITH_ERROR("KRPC is not supported yet. Please set the 'use_krpc' flag to "
-      "false and restart the cluster.");
+// Implements a FIFO queue of row batches from one or more senders. One queue is
+// maintained per sender if is_merging_ is true for the enclosing receiver, otherwise rows
+// from all senders are placed in the same queue.
+//
+// Batches are added by senders via AddBatch(), and removed by an enclosing
+// KrpcDataStreamRecvr via GetBatch(). There is a soft limit for the total amount of
+// memory consumed by buffered row batches in all sender queues of a receiver. If adding
+// a batch will push the memory consumption beyond the limit, that RPC is added to the
+// 'deferred batches' queue, which will be drained in FIFO order when space opens up.
+// Senders in that state will not be replied to until their row batches are deserialized
+// or the receiver is cancelled. This ensures that only one batch per sender is buffered
+// in the deferred batches queue.
+class KrpcDataStreamRecvr::SenderQueue {
+ public:
+  SenderQueue(KrpcDataStreamRecvr* parent_recvr, int num_senders);
+
+  // Returns the next batch from this sender queue. Sets the returned batch in cur_batch_.
+  // A returned batch that is not filled to capacity does *not* indicate end-of-stream.
+  // The call blocks until another batch arrives or all senders close their channels.
+  // The returned batch is owned by the sender queue. The caller must acquire the
+  // resources from the returned batch before the next call to GetBatch().
+  Status GetBatch(RowBatch** next_batch);
+
+  // Adds a new row batch to this sender queue if this stream has not been cancelled.
+  // If adding this batch causes us to exceed the receiver's buffer limit, the RPC state
+  // is copied into 'deferred_rpcs_' for deferred processing and this function returns
+  // immediately. The deferred RPCs are replied to later when space becomes available.
+  void AddBatch(const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+      RpcContext* context);
+
+  // Tries inserting the front of 'deferred_rpcs_' queue into 'batch_queue_' if possible.
+  // On success, the first entry of 'deferred_rpcs_' is removed and the sender of the RPC
+  // will be responded to. If the serialized row batch fails to be extracted from the
+  // entry, the error status will be sent as reply.
+  void DequeueDeferredRpc();
+
+  // Takes over the RPC state 'ctx' of an early sender for deferred processing and
+  // kicks off a deserialization task to process it asynchronously. The ownership of
+  // 'ctx' is transferred to this sender queue.
+  void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx);
+
+  // Decrements the number of remaining senders for this queue and signal any threads
+  // waiting on the arrival of new batch if the count drops to 0. The number of senders
+  // will be 1 for a merging KrpcDataStreamRecvr.
+  void DecrementSenders();
+
+  // Sets cancellation flag and signals cancellation to receiver and sender. Subsequent
+  // incoming batches will be dropped and senders in 'deferred_rpcs_' are replied to.
+  void Cancel();
+
+  // Must be called once to cleanup any queued resources.
+  void Close();
+
+  // Returns the current batch from this queue being processed by a consumer.
+  RowBatch* current_batch() const { return current_batch_.get(); }
+
+ private:
+  // Returns true if either (1) 'batch_queue' is empty and there is no pending insertion
+  // or (2) inserting a row batch of 'batch_size' into 'batch_queue' will not cause the
+  // soft limit of the receiver to be exceeded. Expected to be called with lock_ held.
+  bool CanEnqueue(int64_t batch_size) const;
+
+  // Unpacks a serialized row batch from 'request' and 'rpc_context' and populates
+  // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch size will
+  // be stored in 'batch_size'. On failure, the error status is returned.
+  static Status UnpackRequest(const TransmitDataRequestPB* request,
+      RpcContext* rpc_context, kudu::Slice* tuple_offsets, kudu::Slice* tuple_data,
+      int64_t* batch_size);
+
+  // The workhorse function for deserializing a row batch represented by ('header',
+  // 'tuple_offsets' and 'tuple_data') and inserting it into 'batch_queue'. Expects to be
+  // called with 'lock_' held and passed into this function via the argument 'lock'. This
+  // function may drop lock when deserializing the row batch and re-acquire it after
+  // the row batch is deserialized. 'batch_size' is the size in bytes of the deserialized
+  // row batch. The caller is expected to have called CanEnqueue() to make sure the row
+  // batch can be inserted without exceeding the soft limit of the receiver. Also notify
+  // a thread waiting on 'data_arrival_cv_'.
+  void AddBatchWork(int64_t batch_size, const RowBatchHeaderPB& header,
+      const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
+      unique_lock<SpinLock>* lock);
+
+  // Receiver of which this queue is a member.
+  KrpcDataStreamRecvr* recvr_;
+
+  // Protects all subsequent fields.
+  SpinLock lock_;
+
+  // If true, the receiver fragment for this stream got cancelled.
+  bool is_cancelled_ = false;
+
+  // Number of deserialization requests sent to deserialization threads to drain
+  // 'deferred_rpcs_' which are yet to be processed. Used to limit the number of
+  // requests queued.
+  int num_deserialize_tasks_pending_ = 0;
+
+  // Number of senders which haven't closed the channel yet
+  // (if it drops to 0, end-of-stream is true)
+  int num_remaining_senders_;
+
+  // Number of pending row batch insertion. AddBatchWork() may drop and reacquire 'lock_',
+  // causing race between multiple threads calling AddBatch() at the same time or race
+  // between threads calling AddBatch() and threads calling Close() concurrently.
+  // AddBatchWork() increments this counter before dropping 'lock_' for deserializing
+  // the row batch. The counter is decremented after 'lock_' is re-acquired and the row
+  // batch is inserted into 'batch_queue'. The races are as follows:
+  //
+  // 1. Multiple threads inserting into an empty 'batch_queue' concurrently may all see
+  // it as empty before the first thread manages to insert into batch_queue. This may
+  // cause the soft limit to be exceeded. A queue is truly empty iff this counter is 0.
+  //
+  // 2. Close() cannot proceed until this counter is 0 to make sure all pending inserts
+  // complete before the 'batch_queue' is cleared.
+  int num_pending_enqueue_ = 0;
+
+  // Signal the arrival of new batch or the eos/cancelled condition.
+  condition_variable_any data_arrival_cv_;
+
+  // Queue of (batch length, batch) pairs. The SenderQueue owns the memory to these
+  // batches until they are handed off to the callers of GetBatch().
+  typedef list<pair<int, std::unique_ptr<RowBatch>>> RowBatchQueue;
+  RowBatchQueue batch_queue_;
+
+  // The batch that was most recently returned via GetBatch(), i.e. the current batch
+  // from this queue being processed by a consumer. It's destroyed when the next batch
+  // is retrieved.
+  scoped_ptr<RowBatch> current_batch_;
+
+  // Set to true when the first batch has been received
+  bool received_first_batch_ = false;
+
+  // Queue of deferred RPCs - those that have a batch to deliver, but the queue was
+  // full when they last tried to do so. The senders wait here until there is a space for
+  // their batches, allowing the receiver-side to implement basic flow-control.
+  std::queue<std::unique_ptr<TransmitDataCtx>> deferred_rpcs_;
+};
+
+KrpcDataStreamRecvr::SenderQueue::SenderQueue(
+    KrpcDataStreamRecvr* parent_recvr, int num_senders)
+  : recvr_(parent_recvr), num_remaining_senders_(num_senders) { }
+
+Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
+  SCOPED_TIMER(recvr_->queue_get_batch_time_);
+  int num_to_dequeue = 0;
+  // The sender id is set below when we decide to dequeue entries from 'deferred_rpcs_'.
+  int sender_id = -1;
+  {
+    unique_lock<SpinLock> l(lock_);
+    // current_batch_ must be replaced with the returned batch.
+    current_batch_.reset();
+    *next_batch = nullptr;
+
+    // Wait until something shows up or we know we're done
+    while (batch_queue_.empty() && !is_cancelled_ && num_remaining_senders_ > 0) {
+      VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
+               << " node=" << recvr_->dest_node_id();
+      // Don't count time spent waiting on the sender as active time.
+      CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
+      CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
+      CANCEL_SAFE_SCOPED_TIMER(
+          received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
+          &is_cancelled_);
+      data_arrival_cv_.wait(l);
+    }
+
+    if (UNLIKELY(is_cancelled_)) {
+      // Cancellation should have drained the entire 'deferred_rpcs_' queue.
+      // Make sure the senders were replied to or they may be stuck waiting for a reply.
+      DCHECK(deferred_rpcs_.empty());
+      return Status::CANCELLED;
+    }
+
+    // All senders have sent their row batches. Nothing to do.
+    if (num_remaining_senders_ == 0 && batch_queue_.empty()) {
+      // Note that it's an invariant that a sender cannot send the EOS RPC until all
+      // outstanding TransmitData() RPCs have been replied to. Therefore, it should be
+      // impossible for num_remaining_senders_ to reach 0 before all RPCs in
+      // 'deferred_rpcs_' have been replied to.
+      DCHECK(deferred_rpcs_.empty());
+      DCHECK_EQ(num_pending_enqueue_, 0);
+      return Status::OK();
+    }
+
+    // Notify the deserialization threads to retry delivering the deferred RPCs.
+    if (!deferred_rpcs_.empty()) {
+      // Try dequeuing multiple entries from 'deferred_rpcs_' to parallelize the CPU
+      // bound deserialization work. No point in dequeuing more than number of
+      // deserialization threads available.
+      DCHECK_GE(deferred_rpcs_.size(), num_deserialize_tasks_pending_);
+      num_to_dequeue = min(FLAGS_datastream_service_num_deserialization_threads,
+          (int)deferred_rpcs_.size() - num_deserialize_tasks_pending_);
+      num_deserialize_tasks_pending_ += num_to_dequeue;
+      sender_id = deferred_rpcs_.front()->request->sender_id();
+    }
+
+    DCHECK(!batch_queue_.empty());
+    received_first_batch_ = true;
+    RowBatch* result = batch_queue_.front().second.release();
+    recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
+    batch_queue_.pop_front();
+    VLOG_ROW << "fetched #rows=" << result->num_rows();
+    current_batch_.reset(result);
+    *next_batch = current_batch_.get();
+  }
+  // Don't hold lock when calling EnqueueDeserializeTask() as it may block.
+  // It's important that the dequeuing of 'deferred_rpcs_' is done after the entry
+  // has been removed from 'batch_queue_' or the deserialization threads may fail to
+  // insert into a non-empty 'batch_queue_' and the receiver will be waiting forever.
+  if (num_to_dequeue > 0) {
+    DCHECK_GE(sender_id, 0);
+    recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
+        recvr_->dest_node_id(), sender_id, num_to_dequeue);
+  }
+  return Status::OK();
 }
 
-[[noreturn]] KrpcDataStreamRecvr::KrpcDataStreamRecvr() {
-  AbortUnsupportedFeature();
+inline bool KrpcDataStreamRecvr::SenderQueue::CanEnqueue(int64_t batch_size) const {
+  // The queue is truly empty iff there is no pending insert. It's important that we
+  // enqueue the new batch regardless of buffer limit if the queue is currently empty.
+  // In the case of a merging receiver, batches are received from a specific queue
+  // based on data order, and the pipeline will stall if the merger is waiting for data
+  // from an empty queue that cannot be filled because the limit has been reached.
+  bool queue_empty = batch_queue_.empty() && num_pending_enqueue_ == 0;
+  return queue_empty || !recvr_->ExceedsLimit(batch_size);
 }
 
-KrpcDataStreamRecvr::~KrpcDataStreamRecvr() {
+Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
+    const TransmitDataRequestPB* request, RpcContext* rpc_context,
+    kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* batch_size) {
+  // Unpack the tuple offsets.
+  KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
+      request->tuple_offsets_sidecar_idx(), tuple_offsets),
+      "Failed to get the tuple offsets sidecar");
+  // Unpack the tuple data.
+  KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
+      request->tuple_data_sidecar_idx(), tuple_data),
+      "Failed to get the tuple data sidecar");
+  // Compute the size of the deserialized row batch.
+  *batch_size =
+      RowBatch::GetDeserializedSize(request->row_batch_header(), *tuple_offsets);
+  return Status::OK();
 }
 
-[[noreturn]] Status KrpcDataStreamRecvr::GetBatch(RowBatch** next_batch) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
+    const RowBatchHeaderPB& header, const kudu::Slice& tuple_offsets,
+    const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock) {
+  DCHECK(lock != nullptr);
+  DCHECK(lock->owns_lock());
+
+  COUNTER_ADD(recvr_->num_accepted_batches_, 1);
+  COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
+  // Reserve queue space before dropping the lock below.
+  recvr_->num_buffered_bytes_.Add(batch_size);
+  DCHECK_GE(num_pending_enqueue_, 0);
+  ++num_pending_enqueue_;
+
+  // Deserialization may take some time due to compression and memory allocation.
+  // Drop the lock so we can deserialize multiple batches in parallel.
+  lock->unlock();
+  unique_ptr<RowBatch> batch;
+  {
+    SCOPED_TIMER(recvr_->deserialize_row_batch_timer_);
+    // At this point, the row batch will be inserted into batch_queue_. Close() will
+    // handle deleting any unconsumed batches from batch_queue_. Close() cannot proceed
+    // until there are no pending insertion to batch_queue_.
+    batch.reset(new RowBatch(recvr_->row_desc(), header, tuple_offsets, tuple_data,
+        recvr_->mem_tracker()));
+  }
+  lock->lock();
+
+  DCHECK_GT(num_pending_enqueue_, 0);
+  --num_pending_enqueue_;
+  VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
+  batch_queue_.emplace_back(batch_size, move(batch));
+  data_arrival_cv_.notify_one();
 }
 
-[[noreturn]] void KrpcDataStreamRecvr::Close() {
-  AbortUnsupportedFeature();
+void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* request,
+    TransmitDataResponsePB* response, RpcContext* rpc_context) {
+  // TODO: Add timers for time spent in this function and queue time in 'batch_queue_'.
+  const RowBatchHeaderPB& header = request->row_batch_header();
+  kudu::Slice tuple_offsets;
+  kudu::Slice tuple_data;
+  int64_t batch_size;
+  Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
+      &batch_size);
+  if (UNLIKELY(!status.ok())) {
+    status.ToProto(response->mutable_status());
+    rpc_context->RespondSuccess();
+    return;
+  }
+
+  {
+    unique_lock<SpinLock> l(lock_);
+    // There should be one or more senders left when this function is called. The reason
+    // is that EndDataStream RPC is not sent until all outstanding TransmitData() RPC has
+    // been replied to. There is at least one TransmitData() RPC which hasn't yet been
+    // responded to if we reach here.
+    DCHECK_GT(num_remaining_senders_, 0);
+    if (UNLIKELY(is_cancelled_)) {
+      Status::OK().ToProto(response->mutable_status());
+      rpc_context->RespondSuccess();
+      return;
+    }
+
+    // If there's something in the queue and this batch will push us over the buffer
+    // limit we need to wait until the queue gets drained. We store the rpc context
+    // so that we can signal it at a later time to resend the batch that we couldn't
+    // process here. If there are already deferred RPCs waiting in queue, the new
+    // batch needs to line up after the deferred RPCs to avoid starvation of senders
+    // in the non-merging case.
+    if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
+      auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
+      deferred_rpcs_.push(move(payload));
+      COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+      return;
+    }
+
+    // At this point, we are committed to inserting the row batch into 'batch_queue_'.
+    AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+  }
+
+  // Respond to the sender to ack the insertion of the row batches.
+  Status::OK().ToProto(response->mutable_status());
+  rpc_context->RespondSuccess();
+}
+
+void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+  // Owns the first entry of 'deferred_rpcs_' if it ends up being popped.
+  std::unique_ptr<TransmitDataCtx> ctx;
+  {
+    unique_lock<SpinLock> l(lock_);
+    DCHECK_GT(num_deserialize_tasks_pending_, 0);
+    --num_deserialize_tasks_pending_;
+
+    // Returns if the queue has been cancelled or if it's empty.
+    if (UNLIKELY(is_cancelled_) || deferred_rpcs_.empty()) return;
+
+    // Try enqueuing the first entry into 'batch_queue_'.
+    ctx.swap(deferred_rpcs_.front());
+    kudu::Slice tuple_offsets;
+    kudu::Slice tuple_data;
+    int64_t batch_size;
+    Status status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
+        &tuple_data, &batch_size);
+    // Reply with error status if the entry cannot be unpacked.
+    if (UNLIKELY(!status.ok())) {
+      status.ToProto(ctx->response->mutable_status());
+      ctx->rpc_context->RespondSuccess();
+      deferred_rpcs_.pop();
+      return;
+    }
+
+    // Stops if inserting the batch causes us to go over the limit.
+    // Put 'ctx' back on the queue.
+    if (!CanEnqueue(batch_size)) {
+      ctx.swap(deferred_rpcs_.front());
+      DCHECK(deferred_rpcs_.front().get() != nullptr);
+      return;
+    }
+
+    // Dequeues the deferred batch and adds it to 'batch_queue_'.
+    deferred_rpcs_.pop();
+    const RowBatchHeaderPB& header = ctx->request->row_batch_header();
+    AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+  }
+
+  // Responds to the sender to ack the insertion of the row batches.
+  Status::OK().ToProto(ctx->response->mutable_status());
+  ctx->rpc_context->RespondSuccess();
+}
+
+void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
+    unique_ptr<TransmitDataCtx> ctx) {
+  int sender_id = ctx->request->sender_id();
+  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+  {
+    lock_guard<SpinLock> l(lock_);
+    deferred_rpcs_.push(move(ctx));
+    ++num_deserialize_tasks_pending_;
+  }
+  recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
+      recvr_->dest_node_id(), sender_id, 1);
+}
+
+void KrpcDataStreamRecvr::SenderQueue::DecrementSenders() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK_GT(num_remaining_senders_, 0);
+  num_remaining_senders_ = max(0, num_remaining_senders_ - 1);
+  VLOG_FILE << "decremented senders: fragment_instance_id="
+            << recvr_->fragment_instance_id()
+            << " node_id=" << recvr_->dest_node_id()
+            << " #senders=" << num_remaining_senders_;
+  if (num_remaining_senders_ == 0) data_arrival_cv_.notify_one();
+}
+
+void KrpcDataStreamRecvr::SenderQueue::Cancel() {
+  {
+    lock_guard<SpinLock> l(lock_);
+    if (is_cancelled_) return;
+    is_cancelled_ = true;
+
+    // Respond to deferred RPCs.
+    while (!deferred_rpcs_.empty()) {
+      const unique_ptr<TransmitDataCtx>& payload = deferred_rpcs_.front();
+      Status::OK().ToProto(payload->response->mutable_status());
+      payload->rpc_context->RespondSuccess();
+      deferred_rpcs_.pop();
+    }
+  }
+  VLOG_QUERY << "cancelled stream: fragment_instance_id_="
+             << recvr_->fragment_instance_id()
+             << " node_id=" << recvr_->dest_node_id();
+  // Wake up all threads waiting to produce/consume batches. They will all
+  // notice that the stream is cancelled and handle it.
+  data_arrival_cv_.notify_all();
+  PeriodicCounterUpdater::StopTimeSeriesCounter(
+      recvr_->bytes_received_time_series_counter_);
+}
+
+void KrpcDataStreamRecvr::SenderQueue::Close() {
+  unique_lock<SpinLock> l(lock_);
+  // Note that the queue must be cancelled first before it can be closed or we may
+  // risk running into a race which can leak row batches. Please see IMPALA-3034.
+  DCHECK(is_cancelled_);
+
+  // Wait for any pending insertion to complete first.
+  while (num_pending_enqueue_ > 0) data_arrival_cv_.wait(l);
+
+  // Delete any batches queued in batch_queue_
+  batch_queue_.clear();
+  current_batch_.reset();
+}
+
+Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
+  DCHECK(is_merging_);
+  vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers;
+  input_batch_suppliers.reserve(sender_queues_.size());
+
+  // Create the merger that will a single stream of sorted rows.
+  merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false));
+
+  for (SenderQueue* queue: sender_queues_) {
+    input_batch_suppliers.push_back(
+        [queue](RowBatch** next_batch) -> Status {
+          return queue->GetBatch(next_batch);
+        });
+  }
+
+  RETURN_IF_ERROR(merger_->Prepare(input_batch_suppliers));
+  return Status::OK();
 }
 
-[[noreturn]] Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
+  for (SenderQueue* sender_queue: sender_queues_) {
+    if (sender_queue->current_batch() != nullptr) {
+      sender_queue->current_batch()->TransferResourceOwnership(transfer_batch);
+    }
+  }
+}
+
+KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
+    MemTracker* parent_tracker, const RowDescriptor* row_desc,
+    const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+    bool is_merging, int64_t total_buffer_limit, RuntimeProfile* profile)
+  : mgr_(stream_mgr),
+    fragment_instance_id_(fragment_instance_id),
+    dest_node_id_(dest_node_id),
+    total_buffer_limit_(total_buffer_limit),
+    row_desc_(row_desc),
+    is_merging_(is_merging),
+    num_buffered_bytes_(0),
+    profile_(profile),
+    recvr_side_profile_(profile_->CreateChild("RecvrSide")),
+    sender_side_profile_(profile_->CreateChild("SenderSide")) {
+  mem_tracker_.reset(new MemTracker(-1, "KrpcDataStreamRecvr", parent_tracker));
+  // Create one queue per sender if is_merging is true.
+  int num_queues = is_merging ? num_senders : 1;
+  sender_queues_.reserve(num_queues);
+  int num_sender_per_queue = is_merging ? 1 : num_senders;
+  for (int i = 0; i < num_queues; ++i) {
+    SenderQueue* queue =
+        sender_queue_pool_.Add(new SenderQueue(this, num_sender_per_queue));
+    sender_queues_.push_back(queue);
+  }
+
+  // Initialize the counters
+  bytes_received_counter_ =
+      ADD_COUNTER(recvr_side_profile_, "TotalBytesReceived", TUnit::BYTES);
+  bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+      recvr_side_profile_, "BytesReceived", bytes_received_counter_);
+  queue_get_batch_time_ = ADD_TIMER(recvr_side_profile_, "TotalGetBatchTime");
+  data_arrival_timer_ =
+      ADD_CHILD_TIMER(recvr_side_profile_, "DataArrivalTimer", "TotalGetBatchTime");
+  first_batch_wait_total_timer_ =
+      ADD_TIMER(recvr_side_profile_, "FirstBatchArrivalWaitTime");
+  deserialize_row_batch_timer_ =
+      ADD_TIMER(sender_side_profile_, "DeserializeRowBatchTime");
+  inactive_timer_ = profile_->inactive_timer();
+  num_early_senders_ =
+      ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
+  num_deferred_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
+  num_accepted_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesAccepted", TUnit::UNIT);
 }
 
-[[noreturn]] Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
-  AbortUnsupportedFeature();
+Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
+  DCHECK(merger_.get() != nullptr);
+  return merger_->GetNext(output_batch, eos);
+}
+
+void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request,
+    TransmitDataResponsePB* response, RpcContext* rpc_context) {
+  int use_sender_id = is_merging_ ? request->sender_id() : 0;
+  // Add all batches to the same queue if is_merging_ is false.
+  sender_queues_[use_sender_id]->AddBatch(request, response, rpc_context);
+}
+
+void KrpcDataStreamRecvr::DequeueDeferredRpc(int sender_id) {
+  int use_sender_id = is_merging_ ? sender_id : 0;
+  // Add all batches to the same queue if is_merging_ is false.
+  sender_queues_[use_sender_id]->DequeueDeferredRpc();
+}
+
+void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
+  int use_sender_id = is_merging_ ? ctx->request->sender_id() : 0;
+  // Add all batches to the same queue if is_merging_ is false.
+  sender_queues_[use_sender_id]->TakeOverEarlySender(move(ctx));
+  COUNTER_ADD(num_early_senders_, 1);
+}
+
+void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
+  int use_sender_id = is_merging_ ? sender_id : 0;
+  sender_queues_[use_sender_id]->DecrementSenders();
+}
+
+void KrpcDataStreamRecvr::CancelStream() {
+  for (auto& queue: sender_queues_) queue->Cancel();
+}
+
+void KrpcDataStreamRecvr::Close() {
+  // Remove this receiver from the KrpcDataStreamMgr that created it.
+  // All the sender queues will be cancelled after this call returns.
+  const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id());
+  if (!status.ok()) {
+    LOG(ERROR) << "Error deregistering receiver: " << status.GetDetail();
+  }
+  mgr_ = nullptr;
+  for (auto& queue: sender_queues_) queue->Close();
+  merger_.reset();
+  mem_tracker_->Close();
+  recvr_side_profile_->StopPeriodicCounters();
+}
+
+KrpcDataStreamRecvr::~KrpcDataStreamRecvr() {
+  DCHECK(mgr_ == nullptr) << "Must call Close()";
 }
 
-[[noreturn]] void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
-  AbortUnsupportedFeature();
+Status KrpcDataStreamRecvr::GetBatch(RowBatch** next_batch) {
+  DCHECK(!is_merging_);
+  DCHECK_EQ(sender_queues_.size(), 1);
+  return sender_queues_[0]->GetBatch(next_batch);
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index be46ae3..8bd99cf 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -20,28 +20,207 @@
 
 #include "data-stream-recvr-base.h"
 
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/object-pool.h"
 #include "common/status.h"
+#include "gen-cpp/Types_types.h"   // for TUniqueId
+#include "runtime/descriptors.h"
+#include "util/tuple-row-compare.h"
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
 
 namespace impala {
 
+class KrpcDataStreamMgr;
+class MemTracker;
 class RowBatch;
-class TupleRowComparator;
+class RuntimeProfile;
+class SortedRunMerger;
+class TransmitDataCtx;
+class TransmitDataRequestPB;
+class TransmitDataResponsePB;
 
-/// TODO: Stub for the KRPC version of the DataStreamRecvr. Fill with actual
-/// implementation.
+/// Single receiver of an m:n data stream.
+///
+/// KrpcDataStreamRecvr maintains one or more queues of row batches received by a
+/// KrpcDataStreamMgr from one or more sender fragment instances. Receivers are created
+/// via KrpcDataStreamMgr::CreateRecvr(). Ownership of a stream recvr is shared between
+/// the KrpcDataStreamMgr that created it and the caller of
+/// KrpcDataStreamMgr::CreateRecvr() (i.e. the exchange node).
+///
+/// The is_merging_ member determines if the recvr merges input streams from different
+/// sender fragment instances according to a specified sort order.
+/// If is_merging_ is false : Only one batch queue is maintained for row batches from all
+/// sender fragment instances. These row batches are returned one at a time via GetBatch()
+/// If is_merging_ is true : One queue is created for the batches from each distinct
+/// sender. A SortedRunMerger instance must be created via CreateMerger() prior to
+/// retrieving any rows from the receiver. Rows are retrieved from the receiver via
+/// GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to
+/// GetNext(), TransferAllResources() must be called to transfer resources from the input
+/// batches from each sender to the caller's output batch.
+/// The receiver sets deep_copy to false on the merger - resources are transferred from
+/// the input batches from each sender queue to the merger to the output batch by the
+/// merger itself as it processes each run.
+///
+/// KrpcDataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove
+/// the recvr instance from the tracking structure of its KrpcDataStreamMgr in all cases.
 class KrpcDataStreamRecvr : public DataStreamRecvrBase {
  public:
-  [[noreturn]] KrpcDataStreamRecvr();
-  virtual ~KrpcDataStreamRecvr() override;
+  ~KrpcDataStreamRecvr();
+
+  /// Returns next row batch in data stream; blocks if there aren't any.
+  /// Retains ownership of the returned batch. The caller must call TransferAllResources()
+  /// to acquire the resources from the returned batch before the next call to GetBatch().
+  /// A NULL returned batch indicated eos. Must only be called if is_merging_ is false.
+  /// TODO: This is currently only exposed to the non-merging version of the exchange.
+  /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos).
+  Status GetBatch(RowBatch** next_batch);
+
+  /// Deregister from KrpcDataStreamMgr instance, which shares ownership of this instance.
+  void Close();
+
+  /// Create a SortedRunMerger instance to merge rows from multiple sender according to
+  /// the specified row comparator. Fetches the first batches from the individual sender
+  /// queues. The exprs used in less_than must have already been prepared and opened.
+  Status CreateMerger(const TupleRowComparator& less_than);
+
+  /// Fill output_batch with the next batch of rows obtained by merging the per-sender
+  /// input streams. Must only be called if is_merging_ is true.
+  Status GetNext(RowBatch* output_batch, bool* eos);
+
+  /// Transfer all resources from the current batches being processed from each sender
+  /// queue to the specified batch.
+  void TransferAllResources(RowBatch* transfer_batch);
+
+  const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
+  PlanNodeId dest_node_id() const { return dest_node_id_; }
+  const RowDescriptor* row_desc() const { return row_desc_; }
+  MemTracker* mem_tracker() const { return mem_tracker_.get(); }
+
+ private:
+  friend class KrpcDataStreamMgr;
+  class SenderQueue;
+
+  KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, MemTracker* parent_tracker,
+      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, int num_senders, bool is_merging,
+      int64_t total_buffer_limit, RuntimeProfile* profile);
+
+  /// Adds a new row batch to the appropriate sender queue. If the row batch can be
+  /// inserted, the RPC will be responded to before this function returns. If the batch
+  /// can't be added without exceeding the buffer limit, it is appended to a queue for
+  /// deferred processing. The RPC will be responded to when the row batch is deserialized
+  /// later.
+  void AddBatch(const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+      kudu::rpc::RpcContext* context);
+
+  /// Tries adding the first entry of 'deferred_rpcs_' queue for the sender queue
+  /// identified by 'sender_id'. If is_merging_ is false, it always defaults to
+  /// queue 0; If is_merging_ is true, the sender queue is identified by 'sender_id_'.
+  void DequeueDeferredRpc(int sender_id);
+
+  /// Takes over the RPC state 'ctx' of an early sender for deferred processing and
+  /// kicks off a deserialization task to process it asynchronously. This makes sure
+  /// new incoming RPCs won't pass the early senders, leading to starvation.
+  void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx);
+
+  /// Indicate that a particular sender is done. Delegated to the appropriate
+  /// sender queue. Called from KrpcDataStreamMgr.
+  void RemoveSender(int sender_id);
+
+  /// Marks all sender queues as cancelled and notifies all waiting consumers of
+  /// cancellation.
+  void CancelStream();
+
+  /// Return true if the addition of a new batch of size 'batch_size' would exceed the
+  /// total buffer limit.
+  bool ExceedsLimit(int64_t batch_size) {
+    return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
+  }
+
+  /// KrpcDataStreamMgr instance used to create this recvr. (Not owned)
+  KrpcDataStreamMgr* mgr_;
+
+  /// Fragment and node id of the destination exchange node this receiver is used by.
+  TUniqueId fragment_instance_id_;
+  PlanNodeId dest_node_id_;
+
+  /// Soft upper limit on the total amount of buffering in bytes allowed for this stream
+  /// across all sender queues. We defer processing of incoming RPCs once the amount of
+  /// buffered data exceeds this value.
+  const int64_t total_buffer_limit_;
+
+  /// Row schema.
+  const RowDescriptor* row_desc_;
+
+  /// True if this reciver merges incoming rows from different senders. Per-sender
+  /// row batch queues are maintained in this case.
+  bool is_merging_;
+
+  /// total number of bytes held across all sender queues.
+  AtomicInt32 num_buffered_bytes_;
+
+  /// Memtracker for batches in the sender queue(s).
+  boost::scoped_ptr<MemTracker> mem_tracker_;
+
+  /// One or more queues of row batches received from senders. If is_merging_ is true,
+  /// there is one SenderQueue for each sender. Otherwise, row batches from all senders
+  /// are placed in the same SenderQueue. The SenderQueue instances are owned by the
+  /// receiver and placed in sender_queue_pool_.
+  std::vector<SenderQueue*> sender_queues_;
+
+  /// SortedRunMerger used to merge rows from different senders.
+  boost::scoped_ptr<SortedRunMerger> merger_;
+
+  /// Pool of sender queues.
+  ObjectPool sender_queue_pool_;
+
+  /// Runtime profile storing the counters below.
+  RuntimeProfile* profile_;
+
+  /// Maintain two child profiles - receiver side measurements (from the GetBatch() path),
+  /// and sender side measurements (from AddBatch()).
+  RuntimeProfile* recvr_side_profile_;
+  RuntimeProfile* sender_side_profile_;
+
+  /// Number of bytes received.
+  RuntimeProfile::Counter* bytes_received_counter_;
+
+  /// Time series of number of bytes received, samples bytes_received_counter_
+  RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
+
+  /// Total wall-clock time spent deserializing row batches.
+  RuntimeProfile::Counter* deserialize_row_batch_timer_;
+
+  /// Number of senders which arrive before the receiver is ready.
+  RuntimeProfile::Counter* num_early_senders_;
+
+  /// Time spent waiting until the first batch arrives across all queues.
+  /// TODO: Turn this into a wall-clock timer.
+  RuntimeProfile::Counter* first_batch_wait_total_timer_;
+
+  /// Total number of batches received and deferred as sender queue is full.
+  RuntimeProfile::Counter* num_deferred_batches_;
+
+  /// Total number of batches received and accepted into the sender queue.
+  RuntimeProfile::Counter* num_accepted_batches_;
+
+  /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
+  RuntimeProfile::Counter* data_arrival_timer_;
 
-  [[noreturn]] Status GetBatch(RowBatch** next_batch) override;
-  [[noreturn]] void Close() override;
-  [[noreturn]] Status CreateMerger(const TupleRowComparator& less_than) override;
-  [[noreturn]] Status GetNext(RowBatch* output_batch, bool* eos) override;
-  [[noreturn]] void TransferAllResources(RowBatch* transfer_batch) override;
+  /// Pointer to profile's inactive timer.
+  RuntimeProfile::Counter* inactive_timer_;
 
+  /// Total time spent in SenderQueue::GetBatch().
+  RuntimeProfile::Counter* queue_get_batch_time_;
 };
 
 } // namespace impala
 
-#endif /* IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H */
+#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
new file mode 100644
index 0000000..32e20cd
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -0,0 +1,754 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/krpc-data-stream-sender.h"
+
+#include <boost/bind.hpp>
+
+#include <chrono>
+#include <condition_variable>
+#include <iostream>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "common/logging.h"
+#include "exec/kudu-util.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
+#include "gutil/strings/substitute.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "rpc/rpc-mgr.inline.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/aligned-new.h"
+#include "util/debug-util.h"
+#include "util/network-util.h"
+
+#include "gen-cpp/data_stream_service.pb.h"
+#include "gen-cpp/data_stream_service.proxy.h"
+#include "gen-cpp/Types_types.h"
+
+#include "common/names.h"
+
+using std::condition_variable_any;
+using namespace apache::thrift;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
+using kudu::MonoDelta;
+
+DECLARE_int32(rpc_retry_interval_ms);
+
+namespace impala {
+
+// A datastream sender may send row batches to multiple destinations. There is one
+// channel for each destination.
+//
+// Clients can call TransmitData() to directly send a serialized row batch to the
+// destination or it can call AddRow() to accumulate rows in an internal row batch
+// to certain capacity before sending it. The underlying RPC layer is implemented
+// with KRPC, which provides interfaces for asynchronous RPC calls. Normally, the
+// calls above will return before the RPC has completed but they may block if there
+// is already an in-flight RPC.
+//
+// Each channel internally has two OutboundRowBatch to serialize to. They are reused
+// across multiple RPC calls. Having two OutboundRowBatch allows client to serialize
+// the next row batch while the current row batch is being sent. Upon completion of
+// a RPC, the callback TransmitDataCompleteCb() is invoked. If the RPC fails due to
+// remote service's queue being full, TransmitDataCompleteCb() will schedule the retry
+// callback RetryCb() after some delay dervied from 'FLAGS_rpc_retry_internal_ms'.
+//
+// When a data stream sender is shut down, it will call Teardown() on all channels to
+// release resources. Teardown() will cancel any in-flight RPC and wait for the
+// completion callback to be called before returning. It's expected that the execution
+// thread to call FlushAndSendEos() before closing the data stream sender to flush all
+// buffered row batches and send the end-of-stream message to the remote receiver.
+// Note that the RPC payloads are owned solely by the channel and the KRPC layer will
+// relinquish references of them before the completion callback is invoked so it's
+// safe to free them once the callback has been invoked.
+//
+// Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client
+// has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() should be the
+// right place, except that's currently called too early). RpcController::Cancel() ensures
+// that the callback is called only after the RPC layer no longer references the sidecar
+// buffers.
+class KrpcDataStreamSender::Channel : public CacheLineAligned {
+ public:
+  // Creates a channel to send data to particular ipaddress/port/fragment instance id/node
+  // combination. buffer_size is specified in bytes and a soft limit on how much tuple
+  // data is getting accumulated before being sent; it only applies when data is added via
+  // AddRow() and not sent directly via SendBatch().
+  Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc,
+      const TNetworkAddress& destination, const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, int buffer_size)
+    : parent_(parent),
+      row_desc_(row_desc),
+      address_(destination),
+      fragment_instance_id_(fragment_instance_id),
+      dest_node_id_(dest_node_id) {
+    DCHECK(IsResolvedAddress(address_));
+  }
+
+  // Initializes the channel.
+  // Returns OK if successful, error indication otherwise.
+  Status Init(RuntimeState* state);
+
+  // Serializes the given row batch and send it to the destination. If the preceding
+  // RPC is in progress, this function may block until the previous RPC finishes.
+  // Return error status if serialization or the preceding RPC failed. Return OK
+  // otherwise.
+  Status SerializeAndSendBatch(RowBatch* batch);
+
+  // Transmits the serialized row batch 'outbound_batch'. This function may block if the
+  // preceding RPC is still in-flight. This is expected to be called from the fragment
+  // instance execution thread. Return error status if initialization of the RPC request
+  // parameters failed or if the preceding RPC failed. Returns OK otherwise.
+  Status TransmitData(const OutboundRowBatch* outbound_batch);
+
+  // Copies a single row into this channel's row batch and flushes the row batch once
+  // it reaches capacity. This call may block if the row batch's capacity is reached
+  // and the preceding RPC is still in progress. Returns error status if serialization
+  // failed or if the preceding RPC failed. Return OK otherwise.
+  Status AddRow(TupleRow* row);
+
+  // Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will
+  // be cancelled. It's expected that clients normally call FlushAndSendEos() before
+  // calling Teardown() to flush all buffered row batches to destinations. Teardown()
+  // may be called without FlushAndSendEos() in cases such as cancellation or error.
+  void Teardown(RuntimeState* state);
+
+  // Flushes any buffered row batches and sends the EOS RPC to close the channel.
+  // Return error status if either the last TransmitData() RPC or EOS RPC failed.
+  // This function blocks until the EOS RPC is complete.
+  Status FlushAndSendEos(RuntimeState* state);
+
+  int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
+
+  // The type for a RPC worker function.
+  typedef boost::function<Status()> DoRpcFn;
+
+ private:
+  // The parent data stream sender owning this channel. Not owned.
+  KrpcDataStreamSender* parent_;
+
+  // The descriptor of the accumulated rows in 'batch_' below. Used for computing
+  // the capacity of 'batch_' and also when adding a row in AddRow().
+  const RowDescriptor* row_desc_;
+
+  // The triplet of IP-address:port/finst-id/node-id uniquely identifies the receiver.
+  const TNetworkAddress address_;
+  const TUniqueId fragment_instance_id_;
+  const PlanNodeId dest_node_id_;
+
+  // Number of bytes of all serialized row batches sent successfully.
+  int64_t num_data_bytes_sent_ = 0;
+
+  // The row batch for accumulating rows copied from AddRow().
+  // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
+  scoped_ptr<RowBatch> batch_;
+
+  // The outbound row batches are double-buffered so that we can serialize the next
+  // batch while the other is still referenced by the in-flight RPC. Each entry contains
+  // a RowBatchHeaderPB and the buffers for the serialized tuple offsets and data.
+  //
+  // TODO: replace this with an actual queue. Schedule another RPC callback in the
+  // completion callback if the queue is not empty.
+  // TODO: rethink whether to keep per-channel buffers vs having all buffers in the
+  // datastream sender and sharing them across all channels. These buffers are not used in
+  // "UNPARTITIONED" scheme.
+  OutboundRowBatch outbound_batches_[NUM_OUTBOUND_BATCHES];
+
+  // Index into 'outbound_batches_' for the next available OutboundRowBatch to serialize
+  // into. This is read and written by the main execution thread.
+  int next_batch_idx_ = 0;
+
+  // Synchronize accesses to the following fields between the main execution thread and
+  // the KRPC reactor thread. Note that there should be only one reactor thread invoking
+  // the callbacks for a channel so there should be no races between multiple reactor
+  // threads. Protect all subsequent fields.
+  SpinLock lock_;
+
+  // 'lock_' needs to be held when accessing the following fields.
+  // The client interface for making RPC calls to the remote DataStreamService.
+  std::unique_ptr<DataStreamServiceProxy> proxy_;
+
+  // Controller for managing properties of a single RPC call (such as features required
+  // in the remote servers) and passing the payloads to the actual OutboundCall object.
+  RpcController rpc_controller_;
+
+  // Protobuf response buffer for TransmitData() RPC.
+  TransmitDataResponsePB resp_;
+
+  // Protobuf response buffer for EndDataStream() RPC.
+  EndDataStreamResponsePB eos_resp_;
+
+  // Signaled when the in-flight RPC completes.
+  condition_variable_any rpc_done_cv_;
+
+  // Status of the most recently completed RPC.
+  Status rpc_status_;
+
+  // The pointer to the current serialized row batch being sent.
+  const OutboundRowBatch* rpc_in_flight_batch_ = nullptr;
+
+  // True if there is an in-flight RPC.
+  bool rpc_in_flight_ = false;
+
+  // True if the channel is being shut down or shut down already.
+  bool shutdown_ = false;
+
+  // True if the remote receiver is closed already. In which case, all rows would
+  // be dropped silently.
+  // TODO: Fix IMPALA-3990
+  bool remote_recvr_closed_ = false;
+
+  // Returns true if the channel should terminate because the parent sender
+  // has been closed or cancelled.
+  bool ShouldTerminate() const { return shutdown_ || parent_->state_->is_cancelled(); }
+
+  // Send the rows accumulated in the internal row batch. This will serialize the
+  // internal row batch before sending them to the destination. This may block if
+  // the preceding RPC is still in progress. Returns error status if serialization
+  // fails or if the preceding RPC fails.
+  Status SendCurrentBatch();
+
+  // Called when an RPC failed. If it turns out that the RPC failed because the
+  // remote server is too busy, this function will schedule RetryCb() to be called
+  // after FLAGS_rpc_retry_interval_ms milliseconds, which in turn re-invokes the RPC.
+  // Otherwise, it will call MarkDone() to mark the RPC as done and failed.
+  // 'controller_status' is a Kudu status returned from the KRPC layer.
+  // 'rpc_fn' is a worker function which initializes the RPC parameters and invokes
+  // the actual RPC when the RPC is rescheduled.
+  // 'err_msg' is an error message to be prepended to the status converted from the
+  // Kudu status 'controller_status'.
+  void HandleFailedRPC(const DoRpcFn& rpc_fn, const kudu::Status& controller_status,
+      const string& err_msg);
+
+  // Waits for the preceding RPC to complete. Expects to be called with 'lock_' held.
+  // May drop the lock while waiting for the RPC to complete. Return error status if
+  // the preceding RPC fails. Returns CANCELLED if the parent sender is cancelled or
+  // shut down. Returns OK otherwise. This should be only called from a fragment
+  // executor thread.
+  Status WaitForRpc(std::unique_lock<SpinLock>* lock);
+
+  // A callback function called from KRPC reactor thread to retry an RPC which failed
+  // previously due to remote server being too busy. This will re-arm the request
+  // parameters of the RPC. The retry may not happen if the callback has been aborted
+  // internally by KRPC code (e.g. the reactor thread was being shut down) or if the
+  // parent sender has been cancelled or closed since the scheduling of this callback.
+  // In which case, MarkDone() will be called with the error status and the RPC is
+  // considered complete. 'status' is the error status passed by KRPC code in case the
+  // callback was aborted.
+  void RetryCb(DoRpcFn rpc_fn, const kudu::Status& status);
+
+  // A callback function called from KRPC reactor threads upon completion of an in-flight
+  // TransmitData() RPC. This is called when the remote server responds to the RPC or
+  // when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a
+  // successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the
+  // response. HandleFailedRPC() is called to handle failed KRPC call. The RPC may be
+  // rescheduled if it's due to remote server being too busy.
+  void TransmitDataCompleteCb();
+
+  // Initializes the parameters for TransmitData() RPC and invokes the async RPC call.
+  // It will add 'tuple_offsets_' and 'tuple_data_' in 'rpc_in_flight_batch_' as sidecars
+  // to the RpcController and store the sidecars' indices to TransmitDataRequestPB sent as
+  // part of the RPC. Returns error status if adding sidecars to the RpcController failed.
+  Status DoTransmitDataRpc();
+
+  // A callback function called from KRPC reactor threads upon completion of an in-flight
+  // EndDataStream() RPC. This is called when the remote server responds to the RPC or
+  // when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a
+  // successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the
+  // response. HandleFailedRPC() is called to handle failed KRPC calls. The RPC may be
+  // rescheduled if it's due to remote server being too busy.
+  void EndDataStreamCompleteCb();
+
+  // Initializes the parameters for EndDataStream() RPC and invokes the async RPC call.
+  Status DoEndDataStreamRpc();
+
+  // Marks the in-flight RPC as completed, updates 'rpc_status_' with the status of the
+  // RPC (indicated in parameter 'status') and notifies any thread waiting for RPC
+  // completion. Expects to be called with 'lock_' held. Called in the context of a
+  // reactor thread.
+  void MarkDone(const Status& status);
+};
+
+Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
+  // TODO: take into account of var-len data at runtime.
+  int capacity =
+      max(1, parent_->per_channel_buffer_size_ / max(row_desc_->GetRowSize(), 1));
+  batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
+
+  // Create a DataStreamService proxy to the destination.
+  RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
+  RETURN_IF_ERROR(rpc_mgr->GetProxy(address_, &proxy_));
+  return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
+  rpc_status_ = status;
+  rpc_in_flight_ = false;
+  rpc_in_flight_batch_ = nullptr;
+  rpc_done_cv_.notify_one();
+}
+
+Status KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* lock) {
+  DCHECK(lock != nullptr);
+  DCHECK(lock->owns_lock());
+
+  SCOPED_TIMER(parent_->state_->total_network_send_timer());
+
+  // Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled.
+  while(rpc_in_flight_ && !ShouldTerminate()) {
+    rpc_done_cv_.wait_for(*lock, std::chrono::milliseconds(50));
+  }
+
+  if (UNLIKELY(ShouldTerminate())) {
+    // DSS is single-threaded so it's impossible for shutdown_ to be true here.
+    DCHECK(!shutdown_);
+    return Status::CANCELLED;
+  }
+
+  DCHECK(!rpc_in_flight_);
+  if (UNLIKELY(!rpc_status_.ok())) {
+    LOG(ERROR) << "channel send status: " << rpc_status_.GetDetail();
+    return rpc_status_;
+  }
+  return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::RetryCb(
+    DoRpcFn rpc_fn, const kudu::Status& cb_status) {
+  COUNTER_ADD(parent_->rpc_retry_counter_, 1);
+  std::unique_lock<SpinLock> l(lock_);
+  DCHECK(rpc_in_flight_);
+  // Aborted by KRPC layer as reactor thread was being shut down.
+  if (UNLIKELY(!cb_status.ok())) {
+    MarkDone(FromKuduStatus(cb_status, "KRPC retry failed"));
+    return;
+  }
+  // Parent datastream sender has been closed or cancelled.
+  if (UNLIKELY(ShouldTerminate())) {
+    MarkDone(Status::CANCELLED);
+    return;
+  }
+  // Retry the RPC.
+  Status status = rpc_fn();
+  if (UNLIKELY(!status.ok())) {
+    MarkDone(status);
+  }
+}
+
+void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
+    const kudu::Status& controller_status, const string& prepend) {
+  // Retrying later if the destination is busy. We don't call ShouldTerminate()
+  // here as this is always checked in RetryCb() anyway.
+  // TODO: IMPALA-6159. Handle 'connection reset by peer' due to stale connections.
+  if (RpcMgr::IsServerTooBusy(rpc_controller_)) {
+    RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
+    // RetryCb() is scheduled to be called in a reactor context.
+    rpc_mgr->messenger()->ScheduleOnReactor(
+        boost::bind(&KrpcDataStreamSender::Channel::RetryCb, this, rpc_fn, _1),
+        MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms));
+    return;
+  }
+  MarkDone(FromKuduStatus(controller_status, prepend));
+}
+
+void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
+  std::unique_lock<SpinLock> l(lock_);
+  DCHECK(rpc_in_flight_);
+  const kudu::Status controller_status = rpc_controller_.status();
+  if (LIKELY(controller_status.ok())) {
+    Status rpc_status = Status::OK();
+    int32_t status_code = resp_.status().status_code();
+    if (LIKELY(status_code == TErrorCode::OK)) {
+      DCHECK(rpc_in_flight_batch_ != nullptr);
+      num_data_bytes_sent_ += RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
+      VLOG_ROW << "incremented #data_bytes_sent=" << num_data_bytes_sent_;
+    } else if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
+      remote_recvr_closed_ = true;
+    } else {
+      rpc_status = Status(resp_.status());
+    }
+    MarkDone(rpc_status);
+  } else {
+    DoRpcFn rpc_fn =
+        boost::bind(&KrpcDataStreamSender::Channel::DoTransmitDataRpc, this);
+    const string& prepend =
+        Substitute("TransmitData() to $0 failed", TNetworkAddressToString(address_));
+    HandleFailedRPC(rpc_fn, controller_status, prepend);
+  }
+}
+
+Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
+  DCHECK(rpc_in_flight_batch_ != nullptr);
+  DCHECK(rpc_in_flight_batch_->IsInitialized());
+
+  // Initialize some constant fields in the request protobuf.
+  TransmitDataRequestPB req;
+  UniqueIdPB* finstance_id_pb = req.mutable_dest_fragment_instance_id();
+  finstance_id_pb->set_lo(fragment_instance_id_.lo);
+  finstance_id_pb->set_hi(fragment_instance_id_.hi);
+  req.set_sender_id(parent_->sender_id_);
+  req.set_dest_node_id(dest_node_id_);
+
+  // Set the RowBatchHeader in the request.
+  req.set_allocated_row_batch_header(
+      const_cast<RowBatchHeaderPB*>(rpc_in_flight_batch_->header()));
+
+  rpc_controller_.Reset();
+  int sidecar_idx;
+  // Add 'tuple_offsets_' as sidecar.
+  KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(RpcSidecar::FromSlice(
+      rpc_in_flight_batch_->TupleOffsetsAsSlice()), &sidecar_idx),
+      "Unable to add tuple offsets to sidecar");
+  req.set_tuple_offsets_sidecar_idx(sidecar_idx);
+
+  // Add 'tuple_data_' as sidecar.
+  KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(
+      RpcSidecar::FromSlice(rpc_in_flight_batch_->TupleDataAsSlice()), &sidecar_idx),
+      "Unable to add tuple data to sidecar");
+  req.set_tuple_data_sidecar_idx(sidecar_idx);
+
+  resp_.Clear();
+  proxy_->TransmitDataAsync(req, &resp_, &rpc_controller_,
+      boost::bind(&KrpcDataStreamSender::Channel::TransmitDataCompleteCb, this));
+  // 'req' took ownership of 'header'. Need to release its ownership or 'header' will be
+  // deleted by destructor.
+  req.release_row_batch_header();
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::TransmitData(
+    const OutboundRowBatch* outbound_batch) {
+  VLOG_ROW << "Channel::TransmitData() finst_id=" << fragment_instance_id_
+           << " dest_node=" << dest_node_id_
+           << " #rows=" << outbound_batch->header()->num_rows();
+  std::unique_lock<SpinLock> l(lock_);
+  RETURN_IF_ERROR(WaitForRpc(&l));
+  DCHECK(!rpc_in_flight_);
+  DCHECK(rpc_in_flight_batch_ == nullptr);
+  // If the remote receiver is closed already, there is no point in sending anything.
+  // TODO: Needs better solution for IMPALA-3990 in the long run.
+  if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
+  rpc_in_flight_ = true;
+  rpc_in_flight_batch_ = outbound_batch;
+  RETURN_IF_ERROR(DoTransmitDataRpc());
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::SerializeAndSendBatch(RowBatch* batch) {
+  OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
+  DCHECK(outbound_batch != rpc_in_flight_batch_);
+  RETURN_IF_ERROR(parent_->SerializeBatch(batch, outbound_batch));
+  RETURN_IF_ERROR(TransmitData(outbound_batch));
+  next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
+  RETURN_IF_ERROR(SerializeAndSendBatch(batch_.get()));
+  batch_->Reset();
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
+  if (batch_->AtCapacity()) {
+    // batch_ is full, let's send it.
+    RETURN_IF_ERROR(SendCurrentBatch());
+  }
+  TupleRow* dest = batch_->GetRow(batch_->AddRow());
+  const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors();
+  for (int i = 0; i < descs.size(); ++i) {
+    if (UNLIKELY(row->GetTuple(i) == nullptr)) {
+      dest->SetTuple(i, nullptr);
+    } else {
+      dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i], batch_->tuple_data_pool()));
+    }
+  }
+  batch_->CommitLastRow();
+  return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() {
+  std::unique_lock<SpinLock> l(lock_);
+  DCHECK(rpc_in_flight_);
+  const kudu::Status controller_status = rpc_controller_.status();
+  if (LIKELY(controller_status.ok())) {
+    MarkDone(Status(eos_resp_.status()));
+  } else {
+    DoRpcFn rpc_fn =
+        boost::bind(&KrpcDataStreamSender::Channel::DoEndDataStreamRpc, this);
+    const string& prepend =
+        Substitute("EndDataStream() to $0 failed", TNetworkAddressToString(address_));
+    HandleFailedRPC(rpc_fn, controller_status, prepend);
+  }
+}
+
+Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() {
+  DCHECK(rpc_in_flight_);
+  EndDataStreamRequestPB eos_req;
+  rpc_controller_.Reset();
+  UniqueIdPB* finstance_id_pb = eos_req.mutable_dest_fragment_instance_id();
+  finstance_id_pb->set_lo(fragment_instance_id_.lo);
+  finstance_id_pb->set_hi(fragment_instance_id_.hi);
+  eos_req.set_sender_id(parent_->sender_id_);
+  eos_req.set_dest_node_id(dest_node_id_);
+  eos_resp_.Clear();
+  proxy_->EndDataStreamAsync(eos_req, &eos_resp_, &rpc_controller_,
+      boost::bind(&KrpcDataStreamSender::Channel::EndDataStreamCompleteCb, this));
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
+  VLOG_RPC << "Channel::FlushAndSendEos() instance_id=" << fragment_instance_id_
+           << " dest_node=" << dest_node_id_
+           << " #rows= " << batch_->num_rows();
+
+  // We can return an error here and not go on to send the EOS RPC because the error that
+  // we returned will be sent to the coordinator who will then cancel all the remote
+  // fragments including the one that this sender is sending to.
+  if (batch_->num_rows() > 0) RETURN_IF_ERROR(SendCurrentBatch());
+  {
+    std::unique_lock<SpinLock> l(lock_);
+    RETURN_IF_ERROR(WaitForRpc(&l));
+    DCHECK(!rpc_in_flight_);
+    if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
+    VLOG_RPC << "calling EndDataStream() to terminate channel.";
+    rpc_in_flight_ = true;
+    RETURN_IF_ERROR(DoEndDataStreamRpc());
+    RETURN_IF_ERROR(WaitForRpc(&l));
+  }
+  return Status::OK();
+}
+
+void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) {
+  // Normally, FlushAndSendEos() should have been called before calling Teardown(),
+  // which means that all the data should already be drained. If the fragment was
+  // was closed or cancelled, there may still be some in-flight RPCs and buffered
+  // row batches to be flushed.
+  std::unique_lock<SpinLock> l(lock_);
+  shutdown_ = true;
+  // Cancel any in-flight RPC.
+  if (rpc_in_flight_) {
+    rpc_controller_.Cancel();
+    while (rpc_in_flight_) rpc_done_cv_.wait(l);
+  }
+  batch_.reset();
+}
+
+KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* row_desc,
+    const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations,
+    int per_channel_buffer_size)
+  : DataSink(row_desc),
+    sender_id_(sender_id),
+    partition_type_(sink.output_partition.type),
+    per_channel_buffer_size_(per_channel_buffer_size),
+    dest_node_id_(sink.dest_node_id),
+    next_unknown_partition_(0) {
+  DCHECK_GT(destinations.size(), 0);
+  DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
+      || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
+      || sink.output_partition.type == TPartitionType::RANDOM
+      || sink.output_partition.type == TPartitionType::KUDU);
+
+  for (int i = 0; i < destinations.size(); ++i) {
+    channels_.push_back(
+        new Channel(this, row_desc, destinations[i].krpc_server,
+            destinations[i].fragment_instance_id, sink.dest_node_id,
+            per_channel_buffer_size));
+  }
+
+  if (partition_type_ == TPartitionType::UNPARTITIONED ||
+      partition_type_ == TPartitionType::RANDOM) {
+    // Randomize the order we open/transmit to channels to avoid thundering herd problems.
+    srand(reinterpret_cast<uint64_t>(this));
+    random_shuffle(channels_.begin(), channels_.end());
+  }
+}
+
+string KrpcDataStreamSender::GetName() {
+  return Substitute("KrpcDataStreamSender (dst_id=$0)", dest_node_id_);
+}
+
+KrpcDataStreamSender::~KrpcDataStreamSender() {
+  // TODO: check that sender was either already closed() or there was an error
+  // on some channel
+  for (int i = 0; i < channels_.size(); ++i) {
+    delete channels_[i];
+  }
+}
+
+Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
+    const TDataSink& tsink, RuntimeState* state) {
+  DCHECK(tsink.__isset.stream_sink);
+  if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
+      partition_type_ == TPartitionType::KUDU) {
+    RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
+        *row_desc_, state, &partition_exprs_));
+  }
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Prepare(
+    RuntimeState* state, MemTracker* parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
+  state_ = state;
+  SCOPED_TIMER(profile_->total_time_counter());
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state,
+      state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
+      &partition_expr_evals_));
+  serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
+  rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
+  bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+  uncompressed_bytes_counter_ =
+      ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
+  total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);
+  overall_throughput_ =
+      profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
+           bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
+                         profile()->total_time_counter()));
+  for (int i = 0; i < channels_.size(); ++i) {
+    RETURN_IF_ERROR(channels_[i]->Init(state));
+  }
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Open(RuntimeState* state) {
+  return ScalarExprEvaluator::Open(partition_expr_evals_, state);
+}
+
+Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
+  DCHECK(!closed_);
+  DCHECK(!flushed_);
+
+  if (batch->num_rows() == 0) return Status::OK();
+  if (partition_type_ == TPartitionType::UNPARTITIONED) {
+    OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
+    RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch));
+    // TransmitData() will block if there are still in-flight rpcs (and those will
+    // reference the previously written serialized batch).
+    for (int i = 0; i < channels_.size(); ++i) {
+      RETURN_IF_ERROR(channels_[i]->TransmitData(outbound_batch));
+    }
+    next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
+  } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 1) {
+    // Round-robin batches among channels. Wait for the current channel to finish its
+    // rpc before overwriting its batch.
+    Channel* current_channel = channels_[current_channel_idx_];
+    RETURN_IF_ERROR(current_channel->SerializeAndSendBatch(batch));
+    current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
+  } else if (partition_type_ == TPartitionType::KUDU) {
+    DCHECK_EQ(partition_expr_evals_.size(), 1);
+    int num_channels = channels_.size();
+    for (int i = 0; i < batch->num_rows(); ++i) {
+      TupleRow* row = batch->GetRow(i);
+      int32_t partition =
+          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+      if (partition < 0) {
+        // This row doesn't correspond to a partition, e.g. it's outside the given ranges.
+        partition = next_unknown_partition_;
+        ++next_unknown_partition_;
+      }
+      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
+    }
+  } else {
+    DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
+    // hash-partition batch's rows across channels
+    // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
+    // once we have codegen here.
+    int num_channels = channels_.size();
+    for (int i = 0; i < batch->num_rows(); ++i) {
+      TupleRow* row = batch->GetRow(i);
+      uint64_t hash_val = EXCHANGE_HASH_SEED;
+      for (int j = 0; j < partition_exprs_.size(); ++j) {
+        ScalarExprEvaluator* eval = partition_expr_evals_[j];
+        void* partition_val = eval->GetValue(row);
+        // We can't use the crc hash function here because it does not result in
+        // uncorrelated hashes with different seeds. Instead we use FastHash.
+        // TODO: fix crc hash/GetHashValue()
+        DCHECK(&(eval->root()) == partition_exprs_[j]);
+        hash_val = RawValue::GetHashValueFastHash(
+            partition_val, partition_exprs_[j]->type(), hash_val);
+      }
+      RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
+    }
+  }
+  COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
+  expr_results_pool_->Clear();
+  RETURN_IF_ERROR(state->CheckQueryState());
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
+  DCHECK(!flushed_);
+  DCHECK(!closed_);
+  flushed_ = true;
+  for (int i = 0; i < channels_.size(); ++i) {
+    // If we hit an error here, we can return without closing the remaining channels as
+    // the error is propagated back to the coordinator, which in turn cancels the query,
+    // which will cause the remaining open channels to be closed.
+    RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state));
+  }
+  return Status::OK();
+}
+
+void KrpcDataStreamSender::Close(RuntimeState* state) {
+  if (closed_) return;
+  for (int i = 0; i < channels_.size(); ++i) {
+    channels_[i]->Teardown(state);
+  }
+  ScalarExprEvaluator::Close(partition_expr_evals_, state);
+  ScalarExpr::Close(partition_exprs_);
+  DataSink::Close(state);
+  closed_ = true;
+}
+
+Status KrpcDataStreamSender::SerializeBatch(
+    RowBatch* src, OutboundRowBatch* dest, int num_receivers) {
+  VLOG_ROW << "serializing " << src->num_rows() << " rows";
+  {
+    SCOPED_TIMER(profile_->total_time_counter());
+    SCOPED_TIMER(serialize_batch_timer_);
+    RETURN_IF_ERROR(src->Serialize(dest));
+    int64_t bytes = RowBatch::GetSerializedSize(*dest);
+    int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
+    COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
+    COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
+  }
+  return Status::OK();
+}
+
+int64_t KrpcDataStreamSender::GetNumDataBytesSent() const {
+  // TODO: do we need synchronization here or are reads & writes to 8-byte ints
+  // atomic?
+  int64_t result = 0;
+  for (int i = 0; i < channels_.size(); ++i) {
+    result += channels_[i]->num_data_bytes_sent();
+  }
+  return result;
+}
+
+} // namespace impala
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
new file mode 100644
index 0000000..1a8b30f
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H
+#define IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H
+
+#include <vector>
+#include <string>
+
+#include "exec/data-sink.h"
+#include "common/global-types.h"
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "runtime/row-batch.h"
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+class RowDescriptor;
+class MemTracker;
+class TDataStreamSink;
+class TNetworkAddress;
+class TPlanFragmentDestination;
+
+/// Single sender of an m:n data stream.
+///
+/// Row batch data is routed to destinations based on the provided partitioning
+/// specification.
+/// *Not* thread-safe.
+///
+/// TODO: capture stats that describe distribution of rows/data volume
+/// across channels.
+/// TODO: create a PlanNode equivalent class for DataSink.
+class KrpcDataStreamSender : public DataSink {
+ public:
+  /// Construct a sender according to the output specification (tsink), sending to the
+  /// given destinations:
+  /// 'sender_id' identifies this sender instance, and is unique within a fragment.
+  /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink.
+  /// 'destinations' are the receivers' network addresses. There is one channel for each
+  /// destination.
+  /// 'per_channel_buffer_size' is the soft limit in bytes of the buffering into the
+  /// per-channel's accumulating row batch before it will be sent.
+  /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED,
+  /// and RANDOM.
+  KrpcDataStreamSender(int sender_id, const RowDescriptor* row_desc,
+      const TDataStreamSink& tsink,
+      const std::vector<TPlanFragmentDestination>& destinations,
+      int per_channel_buffer_size);
+
+  virtual ~KrpcDataStreamSender();
+
+  virtual std::string GetName();
+
+  /// Initialize the sender by initializing all the channels and allocates all
+  /// the stat counters. Return error status if any channels failed to initialize.
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
+
+  /// Initialize the evaluator of the partitioning expressions. Return error status
+  /// if initialization failed.
+  virtual Status Open(RuntimeState* state);
+
+  /// Flush all buffered data and close all existing channels to destination hosts.
+  /// Further Send() calls are illegal after calling FlushFinal(). It is legal to call
+  /// FlushFinal() no more than once. Return error status if Send() failed or the end
+  /// of stream call failed.
+  virtual Status FlushFinal(RuntimeState* state);
+
+  /// Send data in 'batch' to destination nodes according to partitioning
+  /// specification provided in c'tor.
+  /// Blocks until all rows in batch are placed in their appropriate outgoing
+  /// buffers (ie, blocks if there are still in-flight rpcs from the last
+  /// Send() call).
+  virtual Status Send(RuntimeState* state, RowBatch* batch);
+
+  /// Shutdown all existing channels to destination hosts. Further FlushFinal() calls are
+  /// illegal after calling Close().
+  virtual void Close(RuntimeState* state);
+
+ protected:
+  friend class DataStreamTest;
+
+  /// Initialize any partitioning expressions based on 'thrift_output_exprs' and stores
+  /// them in 'partition_exprs_'. Returns error status if the initialization failed.
+  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+      const TDataSink& tsink, RuntimeState* state);
+
+  /// Return total number of bytes sent. If batches are broadcast to multiple receivers,
+  /// they are counted once per receiver.
+  int64_t GetNumDataBytesSent() const;
+
+ private:
+  class Channel;
+
+  /// Serializes the src batch into the serialized row batch 'dest' and updates
+  /// various stat counters.
+  /// 'num_receivers' is the number of receivers this batch will be sent to. Used for
+  /// updating the stat counters.
+  Status SerializeBatch(RowBatch* src, OutboundRowBatch* dest, int num_receivers = 1);
+
+  /// Sender instance id, unique within a fragment.
+  const int sender_id_;
+
+  /// The type of partitioning to perform.
+  const TPartitionType::type partition_type_;
+
+  /// Amount of per-channel buffering for rows before sending them to the destination.
+  const int per_channel_buffer_size_;
+
+  /// RuntimeState of the fragment instance.
+  RuntimeState* state_ = nullptr;
+
+  /// Index of the current channel to send to if random_ == true.
+  int current_channel_idx_ = 0;
+
+  /// Index of the next OutboundRowBatch to use for serialization.
+  int next_batch_idx_ = 0;
+
+  /// The outbound row batches are double-buffered so that we can serialize the next
+  /// batch while the other is still referenced by the in-flight RPC. Each entry contains
+  /// a RowBatchHeaderPB and buffers for the serialized tuple offsets and data. Used only
+  /// when the partitioning strategy is UNPARTITIONED.
+  static const int NUM_OUTBOUND_BATCHES = 2;
+  OutboundRowBatch outbound_batches_[NUM_OUTBOUND_BATCHES];
+
+  /// If true, this sender has called FlushFinal() successfully.
+  /// Not valid to call Send() anymore.
+  bool flushed_ = false;
+
+  /// If true, this sender has been closed. Not valid to call Send() anymore.
+  bool closed_ = false;
+
+  /// List of all channels. One for each destination.
+  std::vector<Channel*> channels_;
+
+  /// Expressions of partition keys. It's used to compute the
+  /// per-row partition values for shuffling exchange;
+  std::vector<ScalarExpr*> partition_exprs_;
+  std::vector<ScalarExprEvaluator*> partition_expr_evals_;
+
+  /// Time for serializing row batches.
+  RuntimeProfile::Counter* serialize_batch_timer_ = nullptr;
+
+  /// Number of TransmitData() RPC retries due to remote service being busy.
+  RuntimeProfile::Counter* rpc_retry_counter_ = nullptr;
+
+  /// Total number of bytes sent.
+  RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
+
+  /// Total number of bytes of the row batches before compression.
+  RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;
+
+  /// Total number of rows sent.
+  RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr;
+
+  /// Throughput per total time spent in sender
+  RuntimeProfile::Counter* overall_throughput_ = nullptr;
+
+  /// Identifier of the destination plan node.
+  PlanNodeId dest_node_id_;
+
+  /// Used for Kudu partitioning to round-robin rows that don't correspond to a partition
+  /// or when errors are encountered.
+  int next_unknown_partition_;
+
+  /// An arbitrary hash seed used for exchanges.
+  static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37;
+};
+
+} // namespace impala
+
+#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H