You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/11/09 22:55:38 UTC

[3/3] incubator-impala git commit: IMPALA-4856: Port data stream service to KRPC

IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
-------

* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.

TO DO
-----

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Reviewed-on: http://gerrit.cloudera.org:8080/8023
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b4ea57a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b4ea57a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b4ea57a7

Branch: refs/heads/master
Commit: b4ea57a7e369c8c0532239db5bb95a701683a150
Parents: a772f84
Author: Michael Ho <kw...@cloudera.com>
Authored: Sun Aug 20 13:53:34 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Nov 9 20:05:08 2017 +0000

----------------------------------------------------------------------
 be/src/common/status.cc                   |  34 +-
 be/src/common/status.h                    |  12 +
 be/src/exec/data-sink.cc                  |  14 +-
 be/src/exec/exchange-node.cc              |   1 +
 be/src/exec/kudu-util.h                   |  11 +-
 be/src/rpc/CMakeLists.txt                 |   2 +
 be/src/rpc/rpc-mgr.cc                     |   4 +-
 be/src/rpc/rpc-mgr.h                      |   2 +
 be/src/runtime/CMakeLists.txt             |   5 +
 be/src/runtime/data-stream-mgr-base.h     |   8 +-
 be/src/runtime/data-stream-mgr.h          |   2 +-
 be/src/runtime/data-stream-recvr.h        |   2 +-
 be/src/runtime/data-stream-sender.h       |   8 +-
 be/src/runtime/exec-env.cc                |  15 +
 be/src/runtime/exec-env.h                 |   1 +
 be/src/runtime/krpc-data-stream-mgr.cc    | 377 ++++++++++++-
 be/src/runtime/krpc-data-stream-mgr.h     | 447 ++++++++++++++-
 be/src/runtime/krpc-data-stream-recvr.cc  | 592 ++++++++++++++++++-
 be/src/runtime/krpc-data-stream-recvr.h   | 201 ++++++-
 be/src/runtime/krpc-data-stream-sender.cc | 754 +++++++++++++++++++++++++
 be/src/runtime/krpc-data-stream-sender.h  | 187 ++++++
 be/src/runtime/row-batch.cc               | 200 +++++--
 be/src/runtime/row-batch.h                | 129 ++++-
 be/src/service/CMakeLists.txt             |  17 +-
 be/src/service/data-stream-service.cc     |  53 ++
 be/src/service/data-stream-service.h      |  54 ++
 be/src/service/impala-server.cc           |   2 +-
 cmake_modules/FindProtobuf.cmake          |   2 +-
 common/protobuf/CMakeLists.txt            |  26 +-
 common/protobuf/common.proto              |  39 ++
 common/protobuf/data_stream_service.proto |  80 +++
 common/protobuf/row_batch.proto           |  39 ++
 common/thrift/generate_error_codes.py     |   3 +-
 33 files changed, 3147 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/common/status.cc
----------------------------------------------------------------------
diff --git a/be/src/common/status.cc b/be/src/common/status.cc
index a009925..cda3e47 100644
--- a/be/src/common/status.cc
+++ b/be/src/common/status.cc
@@ -23,6 +23,7 @@
 #include "util/debug-util.h"
 
 #include "common/names.h"
+#include "gen-cpp/common.pb.h"
 #include "gen-cpp/ErrorCodes_types.h"
 
 namespace impala {
@@ -145,6 +146,10 @@ Status::Status(const TStatus& status) {
   FromThrift(status);
 }
 
+Status::Status(const StatusPB& status) {
+  FromProto(status);
+}
+
 Status& Status::operator=(const TStatus& status) {
   delete msg_;
   FromThrift(status);
@@ -203,7 +208,7 @@ const string Status::GetDetail() const {
 
 void Status::ToThrift(TStatus* status) const {
   status->error_msgs.clear();
-  if (msg_ == NULL) {
+  if (msg_ == nullptr) {
     status->status_code = TErrorCode::OK;
   } else {
     status->status_code = msg_->error();
@@ -213,6 +218,17 @@ void Status::ToThrift(TStatus* status) const {
   }
 }
 
+void Status::ToProto(StatusPB* status) const {
+  status->Clear();
+  if (msg_ == nullptr) {
+    status->set_status_code(TErrorCode::OK);
+  } else {
+    status->set_status_code(msg_->error());
+    status->add_error_msgs(msg_->msg());
+    for (const string& s : msg_->details()) status->add_error_msgs(s);
+  }
+}
+
 void Status::FromThrift(const TStatus& status) {
   if (status.status_code == TErrorCode::OK) {
     msg_ = NULL;
@@ -229,6 +245,22 @@ void Status::FromThrift(const TStatus& status) {
   }
 }
 
+void Status::FromProto(const StatusPB& status) {
+  if (status.status_code() == TErrorCode::OK) {
+    msg_ = nullptr;
+  } else {
+    msg_ = new ErrorMsg();
+    msg_->SetErrorCode(static_cast<TErrorCode::type>(status.status_code()));
+    if (status.error_msgs().size() > 0) {
+      // The first message is the actual error message. (See Status::ToThrift()).
+      msg_->SetErrorMsg(status.error_msgs().Get(0));
+      // The following messages are details.
+      std::for_each(status.error_msgs().begin() + 1, status.error_msgs().end(),
+          [&](string const& detail) { msg_->AddDetail(detail); });
+    }
+  }
+}
+
 void Status::FreeMessage() noexcept {
   delete msg_;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index c3b8d68..24dba8b 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -34,6 +34,8 @@
 
 namespace impala {
 
+class StatusPB;
+
 /// Status is used as a function return type to indicate success, failure or cancellation
 /// of the function. In case of successful completion, it only occupies sizeof(void*)
 /// statically allocated memory and therefore no more members should be added to this
@@ -151,6 +153,10 @@ class NODISCARD Status {
   /// Retains the TErrorCode value and the message
   explicit Status(const TStatus& status);
 
+  /// "Copy" c'tor from StatusPB (a protobuf serialized version of Status object).
+  /// Retains the TErrorCode value and the message
+  explicit Status(const StatusPB& status);
+
   /// "Copy c'tor from HS2 TStatus.
   /// Retains the TErrorCode value and the message
   explicit Status(const apache::hive::service::cli::thrift::TStatus& hs2_status);
@@ -240,6 +246,9 @@ class NODISCARD Status {
   /// Convert into TStatus.
   void ToThrift(TStatus* status) const;
 
+  /// Serialize into StatusPB
+  void ToProto(StatusPB* status) const;
+
   /// Returns the formatted message of the error message and the individual details of the
   /// additional messages as a single string. This should only be called internally and
   /// not to report an error back to the client.
@@ -265,6 +274,9 @@ class NODISCARD Status {
   /// A non-inline function for unwrapping a TStatus object.
   void FromThrift(const TStatus& status);
 
+  /// A non-inline function for unwrapping a StatusPB object.
+  void FromProto(const StatusPB& status);
+
   /// Status uses a naked pointer to ensure the size of an instance on the stack is only
   /// the sizeof(ErrorMsg*). Every Status owns its ErrorMsg instance.
   ErrorMsg* msg_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index a319860..59439cb 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -32,6 +32,7 @@
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/data-stream-sender.h"
+#include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
 
@@ -60,11 +61,14 @@ Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
 
-      // TODO: Remove DCHECK when KRPC is supported.
-      DCHECK(!FLAGS_use_krpc);
-      // TODO: figure out good buffer size based on size of output row
-      *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, row_desc,
-          thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024));
+      if (FLAGS_use_krpc) {
+        *sink = pool->Add(new KrpcDataStreamSender(fragment_instance_ctx.sender_id,
+            row_desc, thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024));
+      } else {
+        // TODO: figure out good buffer size based on size of output row
+        *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, row_desc,
+            thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024));
+      }
       break;
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index b39bcbf..353a59b 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -106,6 +106,7 @@ void ExchangeNode::Codegen(RuntimeState* state) {
 Status ExchangeNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_CANCELLED(state);
   if (is_merging_) {
     // CreateMerger() will populate its merging heap with batches from the stream_recvr_,
     // so it is not necessary to call FillInputRowBatch().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 5de55fe..11cf16a 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -39,11 +39,10 @@ namespace impala {
     } \
   } while (0)
 
-
 #define KUDU_ASSERT_OK(status)                                     \
   do {                                                             \
-    const Status& status_ = FromKuduStatus(status);                \
-    ASSERT_TRUE(status_.ok()) << "Error: " << status_.GetDetail(); \
+    const Status& _s = FromKuduStatus(status);                     \
+    ASSERT_TRUE(_s.ok()) << "Error: " << _s.GetDetail();           \
   } while (0)
 
 class TimestampValue;
@@ -97,8 +96,10 @@ ColumnType KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType typ
 inline Status FromKuduStatus(
     const kudu::Status& k_status, const std::string prepend = "") {
   if (LIKELY(k_status.ok())) return Status::OK();
-  if (prepend.empty()) return Status(k_status.ToString());
-  return Status(strings::Substitute("$0: $1", prepend, k_status.ToString()));
+  const string& err_msg = prepend.empty() ? k_status.ToString() :
+      strings::Substitute("$0: $1", prepend, k_status.ToString());
+  VLOG(1) << err_msg;
+  return Status::Expected(err_msg);
 }
 
 } /// namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 326b806..1985c6c 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -22,10 +22,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 
 # Mark the protobuf files as generated
+set_source_files_properties(${COMMON_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 
 add_library(Rpc
   authentication.cc
+  ${COMMON_PROTO_SRCS}
   rpc-mgr.cc
   rpc-trace.cc
   TAcceptQueueServer.cpp

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index f6491be..6a9d2d4 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -44,6 +44,8 @@ DEFINE_int32(num_acceptor_threads, 2,
 DEFINE_int32(num_reactor_threads, 0,
     "Number of threads dedicated to managing network IO for RPC services. If left at "
     "default value 0, it will be set to number of CPU cores.");
+DEFINE_int32(rpc_retry_interval_ms, 5,
+    "Time in millisecond of waiting before retrying an RPC when remote is busy");
 
 namespace impala {
 
@@ -72,7 +74,7 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu
       messenger_->RegisterService(service_pool->service_name(), service_pool),
       "Could not register service");
   service_pools_.push_back(service_pool);
-
+  VLOG_QUERY << "Registered KRPC service: " << service_pool->service_name();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index d414bb6..85f88b6 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -149,6 +149,8 @@ class RpcMgr {
     return messenger_->metric_entity();
   }
 
+  std::shared_ptr<kudu::rpc::Messenger> messenger() { return messenger_; }
+
   ~RpcMgr() {
     DCHECK_EQ(service_pools_.size(), 0)
         << "Must call Shutdown() before destroying RpcMgr";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 4d95dda..41805af 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -23,6 +23,9 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 # where to put generated binaries
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 
+# Mark the protobuf file as generated
+set_source_files_properties(${ROW_BATCH_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+
 add_library(Runtime
   buffered-tuple-stream.cc
   client-cache.cc
@@ -45,6 +48,7 @@ add_library(Runtime
   initial-reservations.cc
   krpc-data-stream-mgr.cc
   krpc-data-stream-recvr.cc
+  krpc-data-stream-sender.cc
   lib-cache.cc
   mem-tracker.cc
   mem-pool.cc
@@ -56,6 +60,7 @@ add_library(Runtime
   raw-value.cc
   raw-value-ir.cc
   row-batch.cc
+  ${ROW_BATCH_PROTO_SRCS}
   runtime-filter.cc
   runtime-filter-bank.cc
   runtime-filter-ir.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
index 7886cfa..0e392e3 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
+#include "util/aligned-new.h"
 
 namespace impala {
 
@@ -35,7 +36,7 @@ class TUniqueId;
 /// TODO: This is a temporary pure virtual base class that defines the basic interface for
 /// 2 parallel implementations of the DataStreamMgrBase, one each for Thrift and KRPC.
 /// Remove this in favor of the KRPC implementation when possible.
-class DataStreamMgrBase {
+class DataStreamMgrBase : public CacheLineAligned {
  public:
   DataStreamMgrBase() {}
 
@@ -47,11 +48,6 @@ class DataStreamMgrBase {
       PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
       RuntimeProfile* profile, bool is_merging) = 0;
 
-  /// Notifies the recvr associated with the fragment/node id that the specified
-  /// sender has closed.
-  virtual Status CloseSender(const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int sender_id) = 0;
-
   /// Closes all receivers registered for fragment_instance_id immediately.
   virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 1e9f4b6..07f7c56 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -95,7 +95,7 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// sender has closed.
   /// Returns OK if successful, error status otherwise.
   Status CloseSender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
-      int sender_id) override;
+      int sender_id);
 
   /// Closes all receivers registered for fragment_instance_id immediately.
   void Cancel(const TUniqueId& fragment_instance_id) override;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
index 9545f82..37e8f70 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -134,7 +134,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
   /// soft upper limit on the total amount of buffering allowed for this stream across
   /// all sender queues. we stop acking incoming data once the amount of buffered data
   /// exceeds this value
-  int total_buffer_limit_;
+  int64_t total_buffer_limit_;
 
   /// Row schema.
   const RowDescriptor* row_desc_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h
index 8ed4bd0..598f3b9 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -95,16 +95,16 @@ class DataStreamSender : public DataSink {
   /// used to maintain metrics.
   Status SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers = 1);
 
-  /// Return total number of bytes sent in TRowBatch.data. If batches are
-  /// broadcast to multiple receivers, they are counted once per receiver.
-  int64_t GetNumDataBytesSent() const;
-
  protected:
   friend class DataStreamTest;
 
   virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
       const TDataSink& tsink, RuntimeState* state);
 
+  /// Return total number of bytes sent in TRowBatch.data. If batches are
+  /// broadcast to multiple receivers, they are counted once per receiver.
+  int64_t GetNumDataBytesSent() const;
+
  private:
   class Channel;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 20e921b..0ceb636 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -28,6 +28,7 @@
 #include "common/object-pool.h"
 #include "exec/kudu-util.h"
 #include "gen-cpp/ImpalaInternalService.h"
+#include "kudu/rpc/service_if.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
@@ -47,6 +48,7 @@
 #include "scheduling/admission-controller.h"
 #include "scheduling/request-pool-service.h"
 #include "scheduling/scheduler.h"
+#include "service/data-stream-service.h"
 #include "service/frontend.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/debug-util.h"
@@ -64,6 +66,7 @@
 #include "common/names.h"
 
 using boost::algorithm::join;
+using kudu::rpc::ServiceIf;
 using namespace strings;
 
 DEFINE_bool_hidden(use_statestore, true, "Deprecated, do not use");
@@ -81,6 +84,11 @@ DEFINE_bool_hidden(use_krpc, false, "Used to indicate whether to use KRPC for th
     "DataStream subsystem, or the Thrift RPC layer instead. Defaults to false. "
     "KRPC not yet supported");
 
+DEFINE_int32(datastream_service_queue_depth, 1024, "Size of datastream service queue");
+DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of datastream service "
+    "processing threads. If left at default value 0, it will be set to number of CPU "
+    "cores.");
+
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
@@ -168,6 +176,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
     backend_address_(MakeNetworkAddress(hostname, backend_port)) {
 
   if (FLAGS_use_krpc) {
+    VLOG_QUERY << "Using KRPC.";
     // KRPC relies on resolved IP address. It's set in StartServices().
     krpc_address_.__set_port(krpc_port);
     rpc_mgr_.reset(new RpcMgr());
@@ -293,7 +302,13 @@ Status ExecEnv::Init() {
   // Initialize the RPCMgr before allowing services registration.
   if (FLAGS_use_krpc) {
     krpc_address_.__set_hostname(ip_address_);
+    RETURN_IF_ERROR(KrpcStreamMgr()->Init());
     RETURN_IF_ERROR(rpc_mgr_->Init());
+    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
+    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
+        FLAGS_datastream_service_queue_depth, move(data_svc)));
   }
 
   mem_tracker_.reset(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 416f855..df0d926 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -125,6 +125,7 @@ class ExecEnv {
   RequestPoolService* request_pool_service() { return request_pool_service_.get(); }
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
+  RpcMgr* rpc_mgr() const { return rpc_mgr_.get(); }
   PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
   ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); }
   BufferPool* buffer_pool() { return buffer_pool_.get(); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index a3ed417..b70bca6 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -17,41 +17,376 @@
 
 #include "runtime/krpc-data-stream-mgr.h"
 
-#include "common/logging.h"
+#include <iostream>
+#include <boost/functional/hash.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/thread.hpp>
+
+#include "kudu/rpc/rpc_context.h"
+
+#include "runtime/krpc-data-stream-recvr.h"
+#include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+#include "util/periodic-counter-updater.h"
+#include "util/runtime-profile-counters.h"
+#include "util/uid-util.h"
+
+#include "gen-cpp/data_stream_service.pb.h"
+
+#include "common/names.h"
+
+/// This parameter controls the minimum amount of time a closed stream ID will stay in
+/// closed_stream_cache_ before it is evicted. It needs to be set sufficiently high that
+/// it will outlive all the calls to FindRecvr() for that stream ID, to distinguish
+/// between was-here-but-now-gone and never-here states for the receiver. If the stream
+/// ID expires before a call to FindRecvr(), the sender will see an error which will lead
+/// to query cancellation. Setting this value higher will increase the size of the stream
+/// cache (which is roughly 48 bytes per receiver).
+/// TODO: We don't need millisecond precision here.
+const int32_t STREAM_EXPIRATION_TIME_MS = 300 * 1000;
 
 DECLARE_bool(use_krpc);
+DECLARE_int32(datastream_sender_timeout_ms);
+DEFINE_int32(datastream_service_num_deserialization_threads, 16,
+    "Number of threads for deserializing RPC requests deferred due to the receiver "
+    "not ready or the soft limit of the receiver is reached.");
+
+using boost::mutex;
 
 namespace impala {
 
-[[noreturn]] static void AbortUnsupportedFeature() {
-  // We should have gotten here only if the FLAGS_use_krpc is set to true.
-  CHECK(FLAGS_use_krpc) << "Shouldn't reach here unless startup flag 'use_krpc' "
-      "is true.";
-  // KRPC isn't supported yet, so abort.
-  ABORT_WITH_ERROR("KRPC is not supported yet. Please set the 'use_krpc' flag to "
-      "false and restart the cluster.");
+KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
+  : deserialize_pool_("data-stream-mgr", "deserialize",
+      FLAGS_datastream_service_num_deserialization_threads, 10000,
+      boost::bind(&KrpcDataStreamMgr::DeserializeThreadFn, this, _1, _2)) {
+  MetricGroup* dsm_metrics = metrics->GetOrCreateChildGroup("datastream-manager");
+  num_senders_waiting_ =
+      dsm_metrics->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+  total_senders_waited_ =
+      dsm_metrics->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
+  num_senders_timedout_ = dsm_metrics->AddCounter<int64_t>(
+      "total-senders-timedout-waiting-for-recvr-creation", 0L);
+}
+
+Status KrpcDataStreamMgr::Init() {
+  RETURN_IF_ERROR(Thread::Create("krpc-data-stream-mgr", "maintenance",
+      [this](){ this->Maintenance(); }, &maintenance_thread_));
+  RETURN_IF_ERROR(deserialize_pool_.Init());
+  return Status::OK();
+}
+
+inline uint32_t KrpcDataStreamMgr::GetHashValue(
+    const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id) {
+  uint32_t value = RawValue::GetHashValue(&fragment_instance_id.lo, TYPE_BIGINT, 0);
+  value = RawValue::GetHashValue(&fragment_instance_id.hi, TYPE_BIGINT, value);
+  value = RawValue::GetHashValue(&dest_node_id, TYPE_INT, value);
+  return value;
+}
+
+shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
+    RuntimeState* state, const RowDescriptor* row_desc,
+    const TUniqueId& finst_id, PlanNodeId dest_node_id, int num_senders,
+    int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
+
+  DCHECK(profile != nullptr);
+  VLOG_FILE << "creating receiver for fragment="<< finst_id
+            << ", node=" << dest_node_id;
+  shared_ptr<KrpcDataStreamRecvr> recvr(
+      new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
+          finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
+  uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  {
+    RecvrId recvr_id = make_pair(finst_id, dest_node_id);
+    lock_guard<mutex> l(lock_);
+    fragment_recvr_set_.insert(recvr_id);
+    receiver_map_.insert(make_pair(hash_value, recvr));
+
+    EarlySendersMap::iterator it = early_senders_map_.find(recvr_id);
+    if (it != early_senders_map_.end()) {
+      EarlySendersList& early_senders = it->second;
+      // Let the receiver take over the RPC payloads of early senders and process them
+      // asynchronously.
+      for (unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs) {
+        recvr->TakeOverEarlySender(move(ctx));
+        num_senders_waiting_->Increment(-1);
+      }
+      for (const unique_ptr<EndDataStreamCtx>& ctx : early_senders.closed_sender_ctxs) {
+        recvr->RemoveSender(ctx->request->sender_id());
+        Status::OK().ToProto(ctx->response->mutable_status());
+        ctx->rpc_context->RespondSuccess();
+        num_senders_waiting_->Increment(-1);
+      }
+      early_senders_map_.erase(it);
+    }
+  }
+  return recvr;
+}
+
+shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::FindRecvr(
+    const TUniqueId& finst_id, PlanNodeId dest_node_id, bool* already_unregistered) {
+  VLOG_ROW << "looking up fragment_instance_id=" << finst_id
+           << ", node=" << dest_node_id;
+  *already_unregistered = false;
+  uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  pair<RecvrMap::iterator, RecvrMap::iterator> range =
+      receiver_map_.equal_range(hash_value);
+  while (range.first != range.second) {
+    shared_ptr<KrpcDataStreamRecvr> recvr = range.first->second;
+    if (recvr->fragment_instance_id() == finst_id &&
+        recvr->dest_node_id() == dest_node_id) {
+      return recvr;
+    }
+    ++range.first;
+  }
+  RecvrId recvr_id = make_pair(finst_id, dest_node_id);
+  if (closed_stream_cache_.find(recvr_id) != closed_stream_cache_.end()) {
+    *already_unregistered = true;
+  }
+  return shared_ptr<KrpcDataStreamRecvr>();
+}
+
+void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
+    const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+    kudu::rpc::RpcContext* rpc_context) {
+  RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
+  auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
+  early_senders_map_[recvr_id].waiting_sender_ctxs.emplace_back(move(payload));
+  num_senders_waiting_->Increment(1);
+  total_senders_waited_->Increment(1);
 }
 
-[[noreturn]] KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamMgr::AddEarlyClosedSender(const TUniqueId& finst_id,
+    const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
+    kudu::rpc::RpcContext* rpc_context) {
+  RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
+  auto payload = make_unique<EndDataStreamCtx>(request, response, rpc_context);
+  early_senders_map_[recvr_id].closed_sender_ctxs.emplace_back(move(payload));
+  num_senders_waiting_->Increment(1);
+  total_senders_waited_->Increment(1);
 }
 
-KrpcDataStreamMgr::~KrpcDataStreamMgr(){}
+void KrpcDataStreamMgr::AddData(const TransmitDataRequestPB* request,
+    TransmitDataResponsePB* response, kudu::rpc::RpcContext* rpc_context) {
+  TUniqueId finst_id;
+  finst_id.__set_lo(request->dest_fragment_instance_id().lo());
+  finst_id.__set_hi(request->dest_fragment_instance_id().hi());
+  TPlanNodeId dest_node_id = request->dest_node_id();
+  VLOG_ROW << "AddData(): finst_id=" << PrintId(finst_id)
+           << " node_id=" << request->dest_node_id()
+           << " #rows=" << request->row_batch_header().num_rows()
+           << " sender_id=" << request->sender_id();
+  bool already_unregistered = false;
+  shared_ptr<KrpcDataStreamRecvr> recvr;
+  {
+    lock_guard<mutex> l(lock_);
+    recvr = FindRecvr(finst_id, request->dest_node_id(), &already_unregistered);
+    // If no receiver is found and it's not in the closed stream cache, best guess is
+    // that it is still preparing, so add payload to per-receiver early senders' list.
+    // If the receiver doesn't show up after FLAGS_datastream_sender_timeout_ms ms
+    // (e.g. if the receiver was closed and has already been retired from the
+    // closed_stream_cache_), the sender is timed out by the maintenance thread.
+    if (!already_unregistered && recvr == nullptr) {
+      AddEarlySender(finst_id, request, response, rpc_context);
+      return;
+    }
+  }
+  if (already_unregistered) {
+    // The receiver may remove itself from the receiver map via DeregisterRecvr() at any
+    // time without considering the remaining number of senders. As a consequence,
+    // FindRecvr() may return nullptr even though the receiver was once present. We
+    // detect this case by checking already_unregistered - if true then the receiver was
+    // already closed deliberately, and there's no unexpected error here.
+    Status(TErrorCode::DATASTREAM_RECVR_CLOSED, PrintId(finst_id), dest_node_id)
+        .ToProto(response->mutable_status());
+    rpc_context->RespondSuccess();
+    return;
+  }
+  DCHECK(recvr != nullptr);
+  recvr->AddBatch(request, response, rpc_context);
+}
 
-    [[noreturn]] std::shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
-        RuntimeState* state, const RowDescriptor* row_desc,
-        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
-        int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamMgr::EnqueueDeserializeTask(const TUniqueId& finst_id,
+    PlanNodeId dest_node_id, int sender_id, int num_requests) {
+  for (int i = 0; i < num_requests; ++i) {
+    DeserializeTask payload = {finst_id, dest_node_id, sender_id};
+    deserialize_pool_.Offer(move(payload));
+  }
 }
 
-[[noreturn]] Status KrpcDataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int sender_id) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamMgr::DeserializeThreadFn(int thread_id, const DeserializeTask& task) {
+  shared_ptr<KrpcDataStreamRecvr> recvr;
+  {
+    bool already_unregistered;
+    lock_guard<mutex> l(lock_);
+    recvr = FindRecvr(task.finst_id, task.dest_node_id, &already_unregistered);
+    DCHECK(recvr != nullptr || already_unregistered);
+  }
+  if (recvr != nullptr) recvr->DequeueDeferredRpc(task.sender_id);
+}
+
+void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
+    EndDataStreamResponsePB* response, kudu::rpc::RpcContext* rpc_context) {
+  TUniqueId finst_id;
+  finst_id.__set_lo(request->dest_fragment_instance_id().lo());
+  finst_id.__set_hi(request->dest_fragment_instance_id().hi());
+  VLOG_ROW << "CloseSender(): instance_id=" << PrintId(finst_id)
+           << " node_id=" << request->dest_node_id()
+           << " sender_id=" << request->sender_id();
+  shared_ptr<KrpcDataStreamRecvr> recvr;
+  {
+    lock_guard<mutex> l(lock_);
+    bool already_unregistered;
+    recvr = FindRecvr(finst_id, request->dest_node_id(), &already_unregistered);
+    // If no receiver is found and it's not in the closed stream cache, we still need
+    // to make sure that the close operation is performed so add to per-recvr list of
+    // pending closes. It's possible for a sender to issue EOS RPC without sending any
+    // rows if no rows are materialized at all in the sender side.
+    if (!already_unregistered && recvr == nullptr) {
+      AddEarlyClosedSender(finst_id, request, response, rpc_context);
+      return;
+    }
+  }
+
+  // If we reach this point, either the receiver is found or it has been unregistered
+  // already. In either cases, it's safe to just return an OK status.
+  if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
+  Status::OK().ToProto(response->mutable_status());
+  rpc_context->RespondSuccess();
+
+  {
+    // TODO: Move this to maintenance thread.
+    // Remove any closed streams that have been in the cache for more than
+    // STREAM_EXPIRATION_TIME_MS.
+    lock_guard<mutex> l(lock_);
+    ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
+    int64_t now = MonotonicMillis();
+    int32_t before = closed_stream_cache_.size();
+    while (it != closed_stream_expirations_.end() && it->first < now) {
+      closed_stream_cache_.erase(it->second);
+      closed_stream_expirations_.erase(it++);
+    }
+    DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
+    int32_t after = closed_stream_cache_.size();
+    if (before != after) {
+      VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
+                 << ", eviction took: "
+                 << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
+    }
+  }
+}
+
+Status KrpcDataStreamMgr::DeregisterRecvr(
+    const TUniqueId& finst_id, PlanNodeId dest_node_id) {
+  VLOG_QUERY << "DeregisterRecvr(): fragment_instance_id=" << finst_id
+             << ", node=" << dest_node_id;
+  uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  lock_guard<mutex> l(lock_);
+  pair<RecvrMap::iterator, RecvrMap::iterator> range =
+      receiver_map_.equal_range(hash_value);
+  while (range.first != range.second) {
+    const shared_ptr<KrpcDataStreamRecvr>& recvr = range.first->second;
+    if (recvr->fragment_instance_id() == finst_id &&
+        recvr->dest_node_id() == dest_node_id) {
+      // Notify concurrent AddData() requests that the stream has been terminated.
+      recvr->CancelStream();
+      RecvrId recvr_id =
+          make_pair(recvr->fragment_instance_id(), recvr->dest_node_id());
+      fragment_recvr_set_.erase(recvr_id);
+      receiver_map_.erase(range.first);
+      closed_stream_expirations_.insert(
+          make_pair(MonotonicMillis() + STREAM_EXPIRATION_TIME_MS, recvr_id));
+      closed_stream_cache_.insert(recvr_id);
+      return Status::OK();
+    }
+    ++range.first;
+  }
+
+  const string msg = Substitute(
+      "Unknown row receiver id: fragment_instance_id=$0, dest_node_id=$1",
+      PrintId(finst_id), dest_node_id);
+  return Status(msg);
+}
+
+void KrpcDataStreamMgr::Cancel(const TUniqueId& finst_id) {
+  VLOG_QUERY << "cancelling all streams for fragment=" << finst_id;
+  lock_guard<mutex> l(lock_);
+  FragmentRecvrSet::iterator iter =
+      fragment_recvr_set_.lower_bound(make_pair(finst_id, 0));
+  while (iter != fragment_recvr_set_.end() && iter->first == finst_id) {
+    bool unused;
+    shared_ptr<KrpcDataStreamRecvr> recvr = FindRecvr(iter->first, iter->second, &unused);
+    if (recvr != nullptr) {
+      recvr->CancelStream();
+    } else {
+      // keep going but at least log it
+      LOG(ERROR) << Substitute("Cancel(): missing in stream_map: fragment=$0 node=$1",
+          PrintId(iter->first), iter->second);
+    }
+    ++iter;
+  }
+}
+
+template<typename ContextType, typename RequestPBType>
+void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx) {
+  const RequestPBType* request = ctx->request;
+  TUniqueId finst_id;
+  finst_id.__set_lo(request->dest_fragment_instance_id().lo());
+  finst_id.__set_hi(request->dest_fragment_instance_id().hi());
+
+  Status(TErrorCode::DATASTREAM_SENDER_TIMEOUT, PrintId(finst_id)).ToProto(
+      ctx->response->mutable_status());
+  ctx->rpc_context->RespondSuccess();
+  num_senders_waiting_->Increment(-1);
+  num_senders_timedout_->Increment(1);
+}
+
+void KrpcDataStreamMgr::Maintenance() {
+  while (true) {
+    // Notify any senders that have been waiting too long for their receiver to
+    // appear. Keep lock_ held for only a short amount of time.
+    vector<EarlySendersList> timed_out_senders;
+    {
+      int64_t now = MonotonicMillis();
+      lock_guard<mutex> l(lock_);
+      auto it = early_senders_map_.begin();
+      while (it != early_senders_map_.end()) {
+        if (now - it->second.arrival_time > FLAGS_datastream_sender_timeout_ms) {
+          timed_out_senders.emplace_back(move(it->second));
+          it = early_senders_map_.erase(it);
+        } else {
+          ++it;
+        }
+      }
+    }
+
+    // Send responses to all timed-out senders. We need to propagate the time-out errors
+    // to senders which sent EOS RPC so all query fragments will eventually be cancelled.
+    // Otherwise, the receiver may hang when it eventually gets created as the timed-out
+    // EOS will be lost forever.
+    for (const EarlySendersList& senders_queue : timed_out_senders) {
+      for (const unique_ptr<TransmitDataCtx>& ctx : senders_queue.waiting_sender_ctxs) {
+        RespondToTimedOutSender<TransmitDataCtx, TransmitDataRequestPB>(ctx);
+      }
+      for (const unique_ptr<EndDataStreamCtx>& ctx : senders_queue.closed_sender_ctxs) {
+        RespondToTimedOutSender<EndDataStreamCtx, EndDataStreamRequestPB>(ctx);
+      }
+    }
+    bool timed_out = false;
+    // Wait for 10s
+    shutdown_promise_.Get(10000, &timed_out);
+    if (!timed_out) return;
+  }
 }
 
-[[noreturn]] void KrpcDataStreamMgr::Cancel(const TUniqueId& fragment_instance_id) {
-  AbortUnsupportedFeature();
+KrpcDataStreamMgr::~KrpcDataStreamMgr() {
+  shutdown_promise_.Set(true);
+  deserialize_pool_.Shutdown();
+  LOG(INFO) << "Waiting for data-stream-mgr maintenance thread...";
+  maintenance_thread_->Join();
+  LOG(INFO) << "Waiting for deserialization thread pool...";
+  deserialize_pool_.Join();
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index ef9bb45..5171e80 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -20,33 +20,458 @@
 
 #include "runtime/data-stream-mgr-base.h"
 
+#include <list>
+#include <queue>
+#include <set>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/unordered_map.hpp>
+#include <boost/unordered_set.hpp>
+
 #include "common/status.h"
+#include "common/object-pool.h"
+#include "runtime/data-stream-mgr-base.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
+#include "runtime/row-batch.h"
+#include "util/metrics.h"
+#include "util/promise.h"
+#include "util/runtime-profile.h"
+#include "util/thread-pool.h"
+#include "gen-cpp/Types_types.h"  // for TUniqueId
+
+#include "gutil/macros.h"
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
 
 namespace impala {
 
-class DataStreamRecvrBase;
-class MetricGroup;
-class RuntimeProfile;
+class DescriptorTbl;
+class EndDataStreamRequestPB;
+class EndDataStreamResponsePB;
+class KrpcDataStreamRecvr;
 class RuntimeState;
-class TRowBatch;
+class TransmitDataRequestPB;
+class TransmitDataResponsePB;
+
+/// TRANSMIT DATA PROTOCOL
+/// ----------------------
+///
+/// Impala daemons send tuple data between themselves using a transmission protocol that
+/// is managed by DataStreamMgr and related classes. Batches of tuples are sent between
+/// fragment instances using the TransmitData() RPC; The data transmitted are usually sent
+/// in batches across multiple RPCs. The logical connection between a pair of client and
+/// server is known as a 'channel'. Clients and servers are referred to as 'senders' and
+/// 'receivers' and are implemented by DataStreamSender and DataStreamRecvr respectively.
+/// Please note that the number of senders and number of receivers in a stream aren't
+/// necessarily the same. We refer to the on-going transmissions between m senders and n
+/// receivers as an 'm:n data stream'.
+///
+/// DataStreamMgr is a singleton class that lives for a long as the Impala process, and
+/// manages all streams for all queries. DataStreamRecvr and DataStreamSender have
+/// lifetimes tied to their containing fragment instances.
+///
+/// The protocol proceeds in three phases.
+///
+/// Phase 1: Channel establishment
+/// ------------------------------
+///
+/// In the first phase the sender initiates a channel with a receiver by sending its
+/// first batch. Since the sender may start sending before the receiver is ready, the data
+/// stream manager waits until the receiver has finished initialization and then passes
+/// the sender's request to the receiver. If the receiver does not appear within a
+/// configurable timeout, the data stream manager notifies the sender directly by
+/// returning DATASTREAM_SENDER_TIMEOUT. Note that a sender may have multiple channels,
+/// each of which needs to be initialized with the corresponding receiver.
+///
+/// The sender does not distinguish this phase from the steady-state data transmission
+/// phase, so may time-out etc. as described below.
+///
+/// Phase 2: Data transmission
+/// --------------------------
+///
+/// After the first batch has been received, a sender continues to send batches, one at
+/// a time (so only one TransmitData() RPC per sender is pending completion at any one
+/// time). The rate of transmission is controlled by the receiver: a sender will only
+/// schedule batch transmission when the previous transmission completes successfully.
+/// When a batch is received, a receiver will do one of two things: (1) deserializes it
+/// immediately and adds it to 'batch queue' or (2) defers the deserialization and respond
+/// to the RPC later if the batch queue is full. In the second case, the RPC state is
+/// saved into the receiver's 'deferred_rpcs_' queue. When space becomes available in the
+/// batch queue, the longest-waiting RPC is removed from the 'deferred_rpcs_' queue and
+/// the row batch is deserialized. In both cases, the RPC is replied to when the batch
+/// has been deserialized and added to the batch queue. The sender will then send its
+/// next batch.
+///
+/// Phase 3: End of stream
+/// ----------------------
+///
+/// When the stream is terminated, clients will send EndDataStream() RPCs to the servers.
+/// This RPC will not be sent until after the final TransmitData() RPC has completed and
+/// the stream's contents has been delivered. After EndDataStream() is received, no more
+/// TransmitData() RPCs should be expected from this sender.
+///
+/// Exceptional conditions: cancellation, timeouts, failure
+/// -------------------------------------------------------
+///
+/// The protocol must deal with the following complications: asynchronous cancellation of
+/// either the receiver or sender, timeouts during RPC transmission, and failure of either
+/// the receiver or sender.
+///
+/// 1. Cancellation
+///
+/// If the receiver is cancelled (or closed for any other reason, like reaching a limit)
+/// before the sender has completed the stream it will be torn down immediately. Any
+/// incomplete senders may not be aware of this, and will continue to send batches. The
+/// data stream manager on the receiver keeps a record of recently completed receivers so
+/// that it may intercept the 'late' data transmissions and immediately reject them with
+/// an error that signals the sender should terminate. The record is removed after a
+/// certain period of time.
+///
+/// It's possible for the closed receiver record to be removed before all senders have
+/// completed. It is usual that the coordinator will initiate cancellation (e.g. the
+/// query is unregistered after initial result rows are fetched once the limit is hit).
+/// before the timeout period expires so the sender will be cancelled already. However,
+/// it can also occur that the query may not complete before the timeout has elapsed.
+/// A sender which sends a row batch after the timeout has elapsed may hit time-out and
+/// fail the query. This problem is being tracked in IMPALA-3990.
+///
+/// The sender RPCs are sent asynchronously to the main thread of fragment instance
+/// execution. Senders do not block in TransmitData() RPCs, and may be cancelled at any
+/// time. If an in-flight RPC is cancelled at the sender side, the reply from the receiver
+/// will be silently dropped by the RPC layer.
+///
+/// 2. Timeouts during RPC transmission
+///
+/// Since RPCs may be arbitrarily delayed in the pending sender queue, the TransmitData()
+/// RPC has no RPC-level timeout. Instead, the receiver returns an error to the sender if
+/// a timeout occurs during the initial channel establishment phase. Since the
+/// TransmitData() RPC is asynchronous from the sender, the sender may continue to check
+/// for cancellation while it is waiting for a response from the receiver.
+///
+/// 3. Node or instance failure
+///
+/// If the receiver node fails, RPCs will fail fast and the sending fragment instance will
+/// be cancelled.
+///
+/// If a sender node fails, or the receiver node hangs, the coordinator should detect the
+/// failure and cancel all fragments.
+///
+/// TODO: Fix IMPALA-3990: use non-timed based approach for removing the closed stream
+/// receiver.
+///
+
+/// Context for a TransmitData() RPC. This structure is constructed when the processing of
+/// a RPC is deferred because the receiver isn't prepared or the 'batch_queue' is full.
+struct TransmitDataCtx {
+  /// Request data structure, memory owned by 'rpc_context'. This contains various info
+  /// such as the destination finst ID, plan node ID and the row batch header.
+  const TransmitDataRequestPB* request;
+
+  /// Response data structure, will be serialized back to client after 'rpc_context' is
+  /// responded to.
+  TransmitDataResponsePB* response;
+
+  /// RpcContext owns the memory of all data structures related to the incoming RPC call
+  /// such as the serialized request buffer, response buffer and any sidecars. Must be
+  /// responded to once this RPC is finished with. RpcContext will delete itself once it
+  /// has been responded to. Not owned.
+  kudu::rpc::RpcContext* rpc_context;
+
+  TransmitDataCtx(const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+      kudu::rpc::RpcContext* rpc_context)
+    : request(request), response(response), rpc_context(rpc_context) { }
+};
+
+/// Context for an EndDataStream() RPC. This structure is constructed when the RPC is
+/// queued by the data stream manager for deferred processing when the receiver isn't
+/// prepared.
+struct EndDataStreamCtx {
+  /// Request data structure, memory owned by 'rpc_context'.
+  const EndDataStreamRequestPB* request;
+
+  /// Response data structure, will be serialized back to client after 'rpc_context' is
+  /// responded to. Memory owned by 'rpc_context'.
+  EndDataStreamResponsePB* response;
+
+  /// Must be responded to once this RPC is finished with. RpcContext will delete itself
+  /// once it has been responded to. Not owned.
+  kudu::rpc::RpcContext* rpc_context;
 
+  EndDataStreamCtx(const EndDataStreamRequestPB* request,
+      EndDataStreamResponsePB* response, kudu::rpc::RpcContext* rpc_context)
+    : request(request), response(response), rpc_context(rpc_context) { }
+};
+
+/// Singleton class which manages all incoming data streams at a backend node.
+/// It provides both producer and consumer functionality for each data stream.
+///
+/// - RPC service threads use this to add incoming data to streams in response to
+///   TransmitData() RPCs (AddData()) or to signal end-of-stream conditions
+///   (CloseSender()).
+/// - Exchange nodes extract data from an incoming stream via a KrpcDataStreamRecvr,
+///   which is created with CreateRecvr().
+//
+/// DataStreamMgr also allows asynchronous cancellation of streams via Cancel()
+/// which unblocks all KrpcDataStreamRecvr::GetBatch() calls that are made on behalf
+/// of the cancelled fragment id.
+///
+/// Exposes three metrics:
+///  'senders-blocked-on-recvr-creation' - currently blocked senders.
+///  'total-senders-blocked-on-recvr-creation' - total number of blocked senders over
+///  time.
+///  'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
+///  timed-out while waiting for a receiver.
+///
+/// TODO: The recv buffers used in KrpcDataStreamRecvr should count against
+/// per-query memory limits.
 class KrpcDataStreamMgr : public DataStreamMgrBase {
  public:
-  [[noreturn]] KrpcDataStreamMgr(MetricGroup* metrics);
-  virtual ~KrpcDataStreamMgr() override;
+  KrpcDataStreamMgr(MetricGroup* metrics);
+
+  /// Initialize the deserialization thread pool and create the maintenance thread.
+  /// Return error status on failure. Return OK otherwise.
+  Status Init();
 
-  [[noreturn]] std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
+  /// Create a receiver for a specific fragment_instance_id/dest_node_id.
+  /// If is_merging is true, the receiver maintains a separate queue of incoming row
+  /// batches for each sender and merges the sorted streams from each sender into a
+  /// single stream.
+  /// Ownership of the receiver is shared between this DataStream mgr instance and the
+  /// caller.
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
       RuntimeProfile* profile, bool is_merging) override;
 
-  [[noreturn]] Status CloseSender(const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int sender_id) override;
+  /// Handler for TransmitData() RPC.
+  ///
+  /// Adds the serialized row batch pointed to by 'request' and 'rpc_context' to the
+  /// receiver identified by the fragment instance id, dest node id and sender id
+  /// specified in 'request'. If the receiver is not yet ready, the processing of
+  /// 'request' is deferred until the recvr is ready, or is timed out. If the receiver
+  /// has already been torn-down (within the last STREAM_EXPIRATION_TIME_MS), the RPC
+  /// will be responded to immediately. Otherwise, the sender will be responded to when
+  /// time out occurs.
+  ///
+  /// 'response' is the reply to the caller and the status for deserializing the row batch
+  /// should be added to it; 'rpc_context' holds the payload of the incoming RPC calls.
+  /// It owns the memory pointed to by 'request','response' and the RPC sidecars. The
+  /// request together with the RPC sidecars make up the serialized row batch.
+  ///
+  /// If the stream would exceed its buffering limit as a result of queuing this batch,
+  /// the batch is deferred for processing later by the deserialization thread pool.
+  ///
+  /// The RPC may not be responded to by the time this function returns if the processing
+  /// is deferred.
+  ///
+  /// TODO: enforce per-sender quotas (something like 200% of buffer_size/#senders),
+  /// so that a single sender can't flood the buffer and stall everybody else.
+  void AddData(const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+      kudu::rpc::RpcContext* rpc_context);
+
+  /// Handler for EndDataStream() RPC.
+  ///
+  /// Notifies the receiver associated with the fragment/dest_node id that the specified
+  /// sender has closed. The RPC will be responded to if the receiver is found.
+  /// Otherwise, the request will be queued in the early senders list and responded
+  /// to either when the receiver is created when the request has timed out.
+  void CloseSender(const EndDataStreamRequestPB* request,
+      EndDataStreamResponsePB* response, kudu::rpc::RpcContext* context);
+
+  /// Cancels all receivers registered for fragment_instance_id immediately. The
+  /// receivers will not accept any row batches after being cancelled. Any buffered
+  /// row batches will not be freed until Close() is called on the receivers.
+  void Cancel(const TUniqueId& fragment_instance_id) override;
+
+  /// Waits for maintenance thread and sender response thread pool to finish.
+  ~KrpcDataStreamMgr();
+
+ private:
+  friend class KrpcDataStreamRecvr;
+
+  /// A task for the deserialization threads to work on. The fields identify
+  /// the target receiver's sender queue.
+  struct DeserializeTask {
+    /// The receiver's fragment instance id.
+    TUniqueId finst_id;
+
+    /// The plan node id of the exchange node owning the receiver.
+    PlanNodeId dest_node_id;
+
+    /// Sender id used for identifying the sender queue for merging exchange.
+    int sender_id;
+  };
+
+  /// Set of threads which deserialize buffered row batches, and deliver them to their
+  /// receivers. Used only if RPCs were deferred when their channel's batch queue was
+  /// full or if the receiver was not yet prepared.
+  ThreadPool<DeserializeTask> deserialize_pool_;
+
+  /// Periodically, respond to all senders that have waited for too long for their
+  /// receivers to show up.
+  std::unique_ptr<Thread> maintenance_thread_;
+
+  /// Used to notify maintenance_thread_ that it should exit.
+  Promise<bool> shutdown_promise_;
+
+  /// Current number of senders waiting for a receiver to register
+  IntGauge* num_senders_waiting_;
+
+  /// Total number of senders that have ever waited for a receiver to register
+  IntCounter* total_senders_waited_;
+
+  /// Total number of senders that timed-out waiting for a receiver to register
+  IntCounter* num_senders_timedout_;
+
+  /// protects all fields below
+  boost::mutex lock_;
+
+  /// Map from hash value of fragment instance id/node id pair to stream receivers;
+  /// Ownership of the stream revcr is shared between this instance and the caller of
+  /// CreateRecvr().
+  /// we don't want to create a map<pair<TUniqueId, PlanNodeId>, KrpcDataStreamRecvr*>,
+  /// because that requires a bunch of copying of ids for lookup
+  typedef
+      boost::unordered_multimap<uint32_t, std::shared_ptr<KrpcDataStreamRecvr>> RecvrMap;
+  RecvrMap receiver_map_;
+
+  /// (Fragment instance id, Plan node id) pair that uniquely identifies a stream.
+  typedef std::pair<impala::TUniqueId, PlanNodeId> RecvrId;
+
+  /// Less-than ordering for RecvrIds.
+  struct ComparisonOp {
+    bool operator()(const RecvrId& a, const RecvrId& b) {
+      if (a.first.hi < b.first.hi) {
+        return true;
+      } else if (a.first.hi > b.first.hi) {
+        return false;
+      } else if (a.first.lo < b.first.lo) {
+        return true;
+      } else if (a.first.lo > b.first.lo) {
+        return false;
+      }
+      return a.second < b.second;
+    }
+  };
+
+  /// An ordered set of receiver IDs so that we can easily find all receiver IDs belonging
+  /// to a fragment instance (by calling std::set::lower_bound(finst_id, 0) to find the
+  /// first entry and iterating until the entry's finst_id doesn't match).
+  ///
+  /// There is one entry in fragment_recvr_set_ for every entry in receiver_map_.
+  typedef std::set<RecvrId, ComparisonOp> FragmentRecvrSet;
+  FragmentRecvrSet fragment_recvr_set_;
+
+  /// List of waiting senders that need to be processed when a receiver is created.
+  /// Access is only thread-safe when lock_ is held.
+  struct EarlySendersList {
+    /// Queue of contexts for senders which called AddData() before the receiver was
+    /// set up.
+    std::vector<std::unique_ptr<TransmitDataCtx>> waiting_sender_ctxs;
+
+    /// Queue of contexts for senders that called EndDataStream() before the receiver was
+    /// set up.
+    std::vector<std::unique_ptr<EndDataStreamCtx>> closed_sender_ctxs;
+
+    /// Monotonic time of arrival of the first sender in ms. Used to notify senders when
+    /// they have waited too long.
+    int64_t arrival_time;
+
+    EarlySendersList() : arrival_time(MonotonicMillis()) { }
+
+    /// Defining the move constructor as vectors of unique_ptr are not copyable.
+    EarlySendersList(EarlySendersList&& other)
+      : waiting_sender_ctxs(move(other.waiting_sender_ctxs)),
+        closed_sender_ctxs(move(other.closed_sender_ctxs)),
+        arrival_time(other.arrival_time) { }
+
+    /// Defining the move operator= as vectors of unique_ptr are not copyable.
+    EarlySendersList& operator=(EarlySendersList&& other) {
+      waiting_sender_ctxs = move(other.waiting_sender_ctxs);
+      closed_sender_ctxs = move(other.closed_sender_ctxs);
+      arrival_time = other.arrival_time;
+      return *this;
+    }
+
+    DISALLOW_COPY_AND_ASSIGN(EarlySendersList);
+  };
+
+  /// Map from stream (which identifies a receiver) to a list of senders that should be
+  /// processed when that receiver arrives.
+  ///
+  /// Entries are removed from early_senders_map_ when either a) a receiver is created
+  /// or b) the Maintenance() thread detects that the longest-waiting sender has been
+  /// waiting for more than FLAGS_datastream_sender_timeout_ms.
+  typedef boost::unordered_map<RecvrId, EarlySendersList> EarlySendersMap;
+  EarlySendersMap early_senders_map_;
+
+  /// Map from monotonic time, in ms, that a stream should be evicted from
+  /// closed_stream_cache to its RecvrId. Used to evict old streams from cache
+  /// efficiently. Using multimap as there may be multiple streams with the same
+  /// eviction time.
+  typedef std::multimap<int64_t, RecvrId> ClosedStreamMap;
+  ClosedStreamMap closed_stream_expirations_;
+
+  /// Cache of recently closed RecvrIds. Used to allow straggling senders to fail fast by
+  /// checking this cache, rather than waiting for the missed-receiver timeout to elapse.
+  boost::unordered_set<RecvrId> closed_stream_cache_;
+
+  /// Adds a request of TransmitData() RPC to the early senders list. Used for storing
+  /// TransmitData() RPC requests which arrive before the receiver finishes preparing.
+  void AddEarlySender(const TUniqueId& fragment_instance_id,
+      const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+      kudu::rpc::RpcContext* context);
+
+  /// Adds a request of EndDataStream() RPC to the early senders list. Used for storing
+  /// EndDataStream() RPC requests which arrive before the receiver finishes preparing.
+  void AddEarlyClosedSender(const TUniqueId& fragment_instance_id,
+      const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
+      kudu::rpc::RpcContext* context);
+
+  /// Enqueue 'num_requests' requests to the deserialization thread pool to drain the
+  /// deferred RPCs for the receiver with fragment instance id of 'finst_id', plan node
+  /// id of 'dest_node_id'. 'sender_id' identifies the sender queue if the receiver
+  /// belongs to a merging exchange node. This may block so no lock should be held when
+  /// calling this function.
+  void EnqueueDeserializeTask(const TUniqueId& finst_id, PlanNodeId dest_node_id,
+      int sender_id, int num_requests);
+
+  /// Worker function for deserializing a deferred RPC request stored in task.
+  /// Called from the deserialization thread.
+  void DeserializeThreadFn(int thread_id, const DeserializeTask& task);
+
+  /// Return a shared_ptr to the receiver for given fragment_instance_id/dest_node_id, or
+  /// an empty shared_ptr if not found. Must be called with lock_ already held. If the
+  /// stream was recently closed, sets *already_unregistered to true to indicate to caller
+  /// that stream will not be available in the future. In that case, the returned
+  /// shared_ptr will be empty.
+  std::shared_ptr<KrpcDataStreamRecvr> FindRecvr(const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, bool* already_unregistered);
+
+  /// Remove receiver for fragment_instance_id/dest_node_id from the map. Will also
+  /// cancel all the sender queues of the receiver.
+  Status DeregisterRecvr(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id);
+
+  /// Returned a hash value generated from the fragment instance id and dest node id.
+  /// The hash value is the key in the 'receiver_map_' for the receiver of
+  /// fragment_instance_id/dest_node_id.
+  uint32_t GetHashValue(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id);
 
-  [[noreturn]] void Cancel(const TUniqueId& fragment_instance_id) override;
+  /// Responds to a sender when a RPC request has timed out waiting for the receiver to
+  /// show up. 'ctx' is the encapsulated RPC request context (e.g. TransmitDataCtx).
+  template<typename ContextType, typename RequestPBType>
+  void RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx);
 
+  /// Notifies any sender that has been waiting for its receiver for more than
+  /// FLAGS_datastream_sender_timeout_ms.
+  ///
+  /// Run by maintenance_thread_.
+  void Maintenance();
 };
 
 } // namespace impala
-#endif /* IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H */
+#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H