You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/11/19 23:44:23 UTC

[impala] branch master updated: IMPALA-9134: pipeline data stream sender flush

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 66322f2  IMPALA-9134: pipeline data stream sender flush
66322f2 is described below

commit 66322f27e36f3c322bfa78726e9742ff110e96fd
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Nov 14 15:09:23 2019 -0800

    IMPALA-9134: pipeline data stream sender flush
    
    This changes the logic so that the last TransmitData
    and EndDataStream RPCs can be sent to receivers in
    parallel, instead of them being serialized in FlushAndSendEos().
    
    This was a problem because each call to FlushAndSendEos() would
    wait for one or two network round-trips, so previously the flush
    could easily take sum(2 * RTT). After this change the sending
    and waiting for RPCs is pipeline so the time taken should be
    closer to 2 * RTT.
    
    Testing:
    This code is exercised by existing tests. Ran exhaustive tests.
    
    Perf:
    No measurable change on local TPC-H (where network latency isn't a
    significant factor).
    
    +----------+-----------------------+---------+------------+------------+----------------+
    | Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
    +----------+-----------------------+---------+------------+------------+----------------+
    | TPCH(30) | parquet / none / none | 7.64    | -1.37%     | 5.00       | +0.29%         |
    +----------+-----------------------+---------+------------+------------+----------------+
    
    +----------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
    | Workload | Query    | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
    +----------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
    | TPCH(30) | TPCH-Q14 | parquet / none / none | 3.75   | 3.39        |   +10.48%  | * 19.43% * |   8.88%        | 7     |   +7.26%       | 0.48    | 1.19  |
    | TPCH(30) | TPCH-Q13 | parquet / none / none | 13.55  | 12.78       |   +6.05%   |   5.43%    |   4.75%        | 7     |   +5.09%       | 1.76    | 2.14  |
    | TPCH(30) | TPCH-Q2  | parquet / none / none | 1.30   | 1.26        |   +3.63%   |   5.53%    |   3.27%        | 7     |   +3.93%       | 1.28    | 1.46  |
    | TPCH(30) | TPCH-Q12 | parquet / none / none | 2.65   | 2.56        |   +3.44%   |   4.23%    |   2.92%        | 7     |   +3.68%       | 1.28    | 1.73  |
    | TPCH(30) | TPCH-Q15 | parquet / none / none | 3.80   | 3.76        |   +0.91%   |   0.81%    |   1.80%        | 7     |   +1.42%       | 1.12    | 1.22  |
    | TPCH(30) | TPCH-Q8  | parquet / none / none | 5.23   | 5.13        |   +2.11%   |   5.41%    |   0.48%        | 7     |   +0.04%       | 0.16    | 1.01  |
    | TPCH(30) | TPCH-Q10 | parquet / none / none | 5.77   | 5.62        |   +2.70%   |   9.84%    |   2.79%        | 7     |   -0.80%       | -0.96   | 0.68  |
    | TPCH(30) | TPCH-Q21 | parquet / none / none | 28.82  | 28.62       |   +0.70%   |   0.59%    |   0.37%        | 7     |   +0.68%       | 1.92    | 2.67  |
    | TPCH(30) | TPCH-Q6  | parquet / none / none | 1.32   | 1.31        |   +0.82%   |   0.36%    |   1.53%        | 7     |   +0.23%       | 1.28    | 1.38  |
    | TPCH(30) | TPCH-Q11 | parquet / none / none | 1.30   | 1.29        |   +0.60%   |   5.11%    |   2.94%        | 7     |   +0.14%       | 0.16    | 0.27  |
    | TPCH(30) | TPCH-Q19 | parquet / none / none | 4.52   | 4.49        |   +0.57%   |   2.98%    |   1.58%        | 7     |   +0.12%       | 0.32    | 0.44  |
    | TPCH(30) | TPCH-Q16 | parquet / none / none | 2.78   | 2.77        |   +0.35%   |   3.21%    |   2.02%        | 7     |   +0.17%       | 0.64    | 0.25  |
    | TPCH(30) | TPCH-Q3  | parquet / none / none | 5.65   | 5.65        |   -0.04%   |   1.62%    |   1.91%        | 7     |   -0.06%       | -0.32   | -0.04 |
    | TPCH(30) | TPCH-Q1  | parquet / none / none | 10.88  | 10.89       |   -0.11%   |   0.76%    |   0.90%        | 7     |   +0.01%       | 0.00    | -0.26 |
    | TPCH(30) | TPCH-Q4  | parquet / none / none | 2.18   | 2.18        |   -0.38%   |   1.33%    |   2.48%        | 7     |   -0.04%       | -0.16   | -0.36 |
    | TPCH(30) | TPCH-Q5  | parquet / none / none | 3.94   | 3.95        |   -0.27%   |   2.37%    |   1.33%        | 7     |   -0.26%       | -1.12   | -0.26 |
    | TPCH(30) | TPCH-Q7  | parquet / none / none | 22.37  | 22.41       |   -0.19%   |   2.94%    |   1.92%        | 7     |   -0.66%       | -0.80   | -0.15 |
    | TPCH(30) | TPCH-Q22 | parquet / none / none | 3.68   | 3.72        |   -1.14%   |   4.95%    |   3.99%        | 7     |   +0.13%       | 0.00    | -0.48 |
    | TPCH(30) | TPCH-Q20 | parquet / none / none | 2.83   | 2.86        |   -1.21%   |   2.69%    |   2.20%        | 7     |   -1.60%       | -0.64   | -0.93 |
    | TPCH(30) | TPCH-Q17 | parquet / none / none | 4.92   | 5.04        |   -2.46%   |   3.11%    |   2.63%        | 7     |   -2.00%       | -1.44   | -1.62 |
    | TPCH(30) | TPCH-Q9  | parquet / none / none | 16.32  | 17.19       |   -5.03%   |   2.76%    |   4.56%        | 7     |   -4.00%       | -2.24   | -2.53 |
    | TPCH(30) | TPCH-Q18 | parquet / none / none | 20.45  | 23.45       |   -12.82%  |   5.49%    | * 23.11% *     | 7     |   -4.89%       | -0.96   | -1.44 |
    +----------+----------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
    
    Change-Id: If66add99e4370f8faf22761b336205cc9f9f1867
    Reviewed-on: http://gerrit.cloudera.org:8080/14717
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/krpc-data-stream-sender.cc | 92 +++++++++++++++++++++----------
 1 file changed, 62 insertions(+), 30 deletions(-)

diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 4291982..972925c 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -91,8 +91,10 @@ const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
 // When a data stream sender is shut down, it will call Teardown() on all channels to
 // release resources. Teardown() will cancel any in-flight RPC and wait for the
 // completion callback to be called before returning. It's expected that the execution
-// thread to call FlushAndSendEos() before closing the data stream sender to flush all
-// buffered row batches and send the end-of-stream message to the remote receiver.
+// thread to flush all buffered row batches and send the end-of-stream message (by
+// calling FlushBatches(), SendEosAsync() and WaitForRpc()) before closing the data
+// stream sender.
+//
 // Note that the RPC payloads are owned solely by the channel and the KRPC layer will
 // relinquish references of them before the completion callback is invoked so it's
 // safe to free them once the callback has been invoked.
@@ -143,15 +145,26 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   Status ALWAYS_INLINE AddRow(TupleRow* row);
 
   // Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will
-  // be cancelled. It's expected that clients normally call FlushAndSendEos() before
-  // calling Teardown() to flush all buffered row batches to destinations. Teardown()
-  // may be called without FlushAndSendEos() in cases such as cancellation or error.
+  // be cancelled. It's expected that clients normally call FlushBatches(), SendEosAsync()
+  // and WaitForRpc() before calling Teardown() to flush all buffered row batches to
+  // destinations. Teardown() may be called without flushing the channel in cases such
+  // as cancellation or error.
   void Teardown(RuntimeState* state);
 
-  // Flushes any buffered row batches and sends the EOS RPC to close the channel.
-  // Return error status if either the last TransmitData() RPC or EOS RPC failed.
-  // This function blocks until the EOS RPC is complete.
-  Status FlushAndSendEos(RuntimeState* state);
+  // Flushes any buffered row batches. Return error status if the TransmitData() RPC
+  // fails. The RPC is sent asynchrononously. WaitForRpc() must be called to wait
+  // for the RPC. This should be only called from a fragment executor thread.
+  Status FlushBatches();
+
+  // Sends the EOS RPC to close the channel. Return error status if sending the EOS RPC
+  // failed. The RPC is sent asynchrononously. WaitForRpc() must be called to wait for
+  // the RPC. This should be only called from a fragment executor thread.
+  Status SendEosAsync();
+
+  // Waits for the preceding RPC to complete. Return error status if the preceding
+  // RPC fails. Returns CANCELLED if the parent sender is cancelled or shut down.
+  // Returns OK otherwise. This should be only called from a fragment executor thread.
+  Status WaitForRpc();
 
   // The type for a RPC worker function.
   typedef boost::function<Status()> DoRpcFn;
@@ -254,12 +267,9 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   void HandleFailedRPC(const DoRpcFn& rpc_fn, const kudu::Status& controller_status,
       const string& err_msg);
 
-  // Waits for the preceding RPC to complete. Expects to be called with 'lock_' held.
-  // May drop the lock while waiting for the RPC to complete. Return error status if
-  // the preceding RPC fails. Returns CANCELLED if the parent sender is cancelled or
-  // shut down. Returns OK otherwise. This should be only called from a fragment
-  // executor thread.
-  Status WaitForRpc(std::unique_lock<SpinLock>* lock);
+  // Same as WaitForRpc() except expects to be called with 'lock_' held and
+  // may drop the lock while waiting for the RPC to complete.
+  Status WaitForRpcLocked(std::unique_lock<SpinLock>* lock);
 
   // A callback function called from KRPC reactor thread to retry an RPC which failed
   // previously due to remote server being too busy. This will re-arm the request
@@ -359,7 +369,12 @@ void KrpcDataStreamSender::Channel::LogSlowFailedRpc(
             << "Error: " << err.ToString();
 }
 
-Status KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* lock) {
+Status KrpcDataStreamSender::Channel::WaitForRpc() {
+  std::unique_lock<SpinLock> l(lock_);
+  return WaitForRpcLocked(&l);
+}
+
+Status KrpcDataStreamSender::Channel::WaitForRpcLocked(std::unique_lock<SpinLock>* lock) {
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
 
@@ -518,7 +533,7 @@ Status KrpcDataStreamSender::Channel::TransmitData(
            << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
            << " #rows=" << outbound_batch->header()->num_rows();
   std::unique_lock<SpinLock> l(lock_);
-  RETURN_IF_ERROR(WaitForRpc(&l));
+  RETURN_IF_ERROR(WaitForRpcLocked(&l));
   DCHECK(!rpc_in_flight_);
   DCHECK(rpc_in_flight_batch_ == nullptr);
   // If the remote receiver is closed already, there is no point in sending anything.
@@ -603,8 +618,8 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() {
   return Status::OK();
 }
 
-Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
-  VLOG_RPC << "Channel::FlushAndSendEos() fragment_instance_id="
+Status KrpcDataStreamSender::Channel::FlushBatches() {
+  VLOG_RPC << "Channel::FlushBatches() fragment_instance_id="
            << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
            << " #rows= " << batch_->num_rows();
 
@@ -612,9 +627,16 @@ Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
   // we returned will be sent to the coordinator who will then cancel all the remote
   // fragments including the one that this sender is sending to.
   if (batch_->num_rows() > 0) RETURN_IF_ERROR(SendCurrentBatch());
+  return Status::OK();
+}
+
+Status KrpcDataStreamSender::Channel::SendEosAsync() {
+  VLOG_RPC << "Channel::SendEosAsync() fragment_instance_id="
+           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
+           << " #rows= " << batch_->num_rows();
+  DCHECK_EQ(0, batch_->num_rows()) << "Batches must be flushed";
   {
     std::unique_lock<SpinLock> l(lock_);
-    RETURN_IF_ERROR(WaitForRpc(&l));
     DCHECK(!rpc_in_flight_);
     DCHECK(rpc_status_.ok());
     if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
@@ -623,16 +645,15 @@ Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
     rpc_in_flight_ = true;
     COUNTER_ADD(parent_->eos_sent_counter_, 1);
     RETURN_IF_ERROR(DoEndDataStreamRpc());
-    RETURN_IF_ERROR(WaitForRpc(&l));
   }
   return Status::OK();
 }
 
 void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) {
-  // Normally, FlushAndSendEos() should have been called before calling Teardown(),
-  // which means that all the data should already be drained. If the fragment was
-  // was closed or cancelled, there may still be some in-flight RPCs and buffered
-  // row batches to be flushed.
+  // Normally, the channel should have been flushed before calling Teardown(), which means
+  // that all the data should already be drained. If the fragment was was closed or
+  // cancelled, there may still be some in-flight RPCs and buffered row batches to be
+  // flushed.
   std::unique_lock<SpinLock> l(lock_);
   shutdown_ = true;
   // Cancel any in-flight RPC.
@@ -991,11 +1012,22 @@ Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
   DCHECK(!flushed_);
   DCHECK(!closed_);
   flushed_ = true;
-  for (int i = 0; i < channels_.size(); ++i) {
-    // If we hit an error here, we can return without closing the remaining channels as
-    // the error is propagated back to the coordinator, which in turn cancels the query,
-    // which will cause the remaining open channels to be closed.
-    RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state));
+
+  // Send out the final row batches and EOS signals on all channels in parallel.
+  // If we hit an error here, we can return without closing the remaining channels as
+  // the error is propagated back to the coordinator, which in turn cancels the query,
+  // which will cause the remaining open channels to be closed.
+  for (Channel* channel : channels_) {
+    RETURN_IF_ERROR(channel->FlushBatches());
+  }
+  for (Channel* channel : channels_) {
+    RETURN_IF_ERROR(channel->WaitForRpc());
+  }
+  for (Channel* channel : channels_) {
+    RETURN_IF_ERROR(channel->SendEosAsync());
+  }
+  for (Channel* channel : channels_) {
+    RETURN_IF_ERROR(channel->WaitForRpc());
   }
   return Status::OK();
 }