You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:50 UTC

[38/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc.h b/be/src/kudu/rpc/rpc.h
new file mode 100644
index 0000000..bd195dc
--- /dev/null
+++ b/be/src/kudu/rpc/rpc.h
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_H
+#define KUDU_RPC_RPC_H
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace rpc {
+
+class Messenger;
+class Rpc;
+
+// Result status of a retriable Rpc.
+//
+// TODO Consider merging this with ScanRpcStatus.
+struct RetriableRpcStatus {
+  enum Result {
+    // There was no error, i.e. the Rpc was successful.
+    OK,
+
+    // The Rpc got an error and it's not retriable.
+    NON_RETRIABLE_ERROR,
+
+    // The server couldn't be reached, i.e. there was a network error while
+    // reaching the replica or a DNS resolution problem.
+    SERVER_NOT_ACCESSIBLE,
+
+    // The server received the request but it was not ready to serve it right
+    // away. It might happen that the server was too busy and did not have
+    // necessary resources or information to serve the request but it
+    // anticipates it should be ready to serve the request really soon, so it's
+    // worth retrying the request at a later time.
+    SERVICE_UNAVAILABLE,
+
+    // For rpc's that are meant only for the leader of a shared resource, when the server
+    // we're interacting with is not the leader.
+    REPLICA_NOT_LEADER,
+
+    // The server doesn't know the resource we're interacting with. For instance a TabletServer
+    // is not part of the config for the tablet we're trying to write to.
+    RESOURCE_NOT_FOUND,
+
+    // The authentication token supplied with the operation was found invalid
+    // by the server. Most likely, the token has expired. If so, get a new token
+    // using client credentials and retry the operation with it.
+    INVALID_AUTHENTICATION_TOKEN,
+  };
+
+  Result result;
+  Status status;
+};
+
+// This class picks a server among a possible set of servers serving a given resource.
+//
+// TODO Currently this only picks the leader, though it wouldn't be unfeasible to have this
+// have an enum so that it can pick any server.
+template <class Server>
+class ServerPicker : public RefCountedThreadSafe<ServerPicker<Server>> {
+ public:
+  virtual ~ServerPicker() {}
+
+  typedef Callback<void(const Status& status, Server* server)> ServerPickedCallback;
+
+  // Picks the leader among the replicas serving a resource.
+  // If the leader was found, it calls the callback with Status::OK() and
+  // with 'server' set to the current leader, otherwise calls the callback
+  // with 'status' set to the failure reason, and 'server' set to nullptr.
+  // If picking a leader takes longer than 'deadline' the callback is called with
+  // Status::TimedOut().
+  virtual void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) = 0;
+
+  // Marks a server as failed/unacessible.
+  virtual void MarkServerFailed(Server *server, const Status &status) = 0;
+
+  // Marks a server as not the leader of config serving the resource we're trying to interact with.
+  virtual void MarkReplicaNotLeader(Server* replica) = 0;
+
+  // Marks a server as not serving the resource we want.
+  virtual void MarkResourceNotFound(Server *replica) = 0;
+};
+
+// Provides utilities for retrying failed RPCs.
+//
+// All RPCs should use HandleResponse() to retry certain generic errors.
+class RpcRetrier {
+ public:
+  RpcRetrier(MonoTime deadline, std::shared_ptr<rpc::Messenger> messenger)
+      : attempt_num_(1),
+        deadline_(deadline),
+        messenger_(std::move(messenger)) {
+    if (deadline_.Initialized()) {
+      controller_.set_deadline(deadline_);
+    }
+    controller_.Reset();
+  }
+
+  // Tries to handle a failed RPC.
+  //
+  // If it was handled (e.g. scheduled for retry in the future), returns
+  // true. In this case, callers should ensure that 'rpc' remains alive.
+  //
+  // Otherwise, returns false and writes the controller status to
+  // 'out_status'.
+  bool HandleResponse(Rpc* rpc, Status* out_status);
+
+  // Retries an RPC at some point in the near future. If 'why_status' is not OK,
+  // records it as the most recent error causing the RPC to retry. This is
+  // reported to the caller eventually if the RPC never succeeds.
+  //
+  // If the RPC's deadline expires, the callback will fire with a timeout
+  // error when the RPC comes up for retrying. This is true even if the
+  // deadline has already expired at the time that Retry() was called.
+  //
+  // Callers should ensure that 'rpc' remains alive.
+  void DelayedRetry(Rpc* rpc, const Status& why_status);
+
+  RpcController* mutable_controller() { return &controller_; }
+  const RpcController& controller() const { return controller_; }
+
+  const MonoTime& deadline() const { return deadline_; }
+
+  const std::shared_ptr<Messenger>& messenger() const {
+    return messenger_;
+  }
+
+  int attempt_num() const { return attempt_num_; }
+
+  // Called when an RPC comes up for retrying. Actually sends the RPC.
+  void DelayedRetryCb(Rpc* rpc, const Status& status);
+
+ private:
+  // The next sent rpc will be the nth attempt (indexed from 1).
+  int attempt_num_;
+
+  // If the remote end is busy, the RPC will be retried (with a small
+  // delay) until this deadline is reached.
+  //
+  // May be uninitialized.
+  MonoTime deadline_;
+
+  // Messenger to use when sending the RPC.
+  std::shared_ptr<Messenger> messenger_;
+
+  // RPC controller to use when sending the RPC.
+  RpcController controller_;
+
+  // In case any retries have already happened, remembers the last error.
+  // Errors from the server take precedence over timeout errors.
+  Status last_error_;
+
+  DISALLOW_COPY_AND_ASSIGN(RpcRetrier);
+};
+
+// An in-flight remote procedure call to some server.
+class Rpc {
+ public:
+  Rpc(const MonoTime& deadline,
+      std::shared_ptr<rpc::Messenger> messenger)
+      : retrier_(deadline, std::move(messenger)) {
+  }
+
+  virtual ~Rpc() {}
+
+  // Asynchronously sends the RPC to the remote end.
+  //
+  // Subclasses should use SendRpcCb() below as the callback function.
+  virtual void SendRpc() = 0;
+
+  // Returns a string representation of the RPC.
+  virtual std::string ToString() const = 0;
+
+  // Returns the number of times this RPC has been sent. Will always be at
+  // least one.
+  int num_attempts() const { return retrier().attempt_num(); }
+
+ protected:
+  const RpcRetrier& retrier() const { return retrier_; }
+  RpcRetrier* mutable_retrier() { return &retrier_; }
+
+ private:
+  friend class RpcRetrier;
+
+  // Callback for SendRpc(). If 'status' is not OK, something failed
+  // before the RPC was sent.
+  virtual void SendRpcCb(const Status& status) = 0;
+
+  // Used to retry some failed RPCs.
+  RpcRetrier retrier_;
+
+  DISALLOW_COPY_AND_ASSIGN(Rpc);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_RPC_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.cc b/be/src/kudu/rpc/rpc_context.cc
new file mode 100644
index 0000000..97da445
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_context.cc
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_context.h"
+
+#include <memory>
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+#include <google/protobuf/message.h>
+
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/trace.h"
+
+using google::protobuf::Message;
+using kudu::pb_util::SecureDebugString;
+using std::string;
+using std::unique_ptr;
+
+namespace kudu {
+
+class Slice;
+
+namespace rpc {
+
+RpcContext::RpcContext(InboundCall *call,
+                       const google::protobuf::Message *request_pb,
+                       google::protobuf::Message *response_pb,
+                       scoped_refptr<ResultTracker> result_tracker)
+  : call_(CHECK_NOTNULL(call)),
+    request_pb_(request_pb),
+    response_pb_(response_pb),
+    result_tracker_(std::move(result_tracker)) {
+  VLOG(4) << call_->remote_method().service_name() << ": Received RPC request for "
+          << call_->ToString() << ":" << std::endl << SecureDebugString(*request_pb_);
+  TRACE_EVENT_ASYNC_BEGIN2("rpc_call", "RPC", this,
+                           "call", call_->ToString(),
+                           "request", pb_util::PbTracer::TracePb(*request_pb_));
+}
+
+RpcContext::~RpcContext() {
+}
+
+void RpcContext::RespondSuccess() {
+  if (AreResultsTracked()) {
+    result_tracker_->RecordCompletionAndRespond(call_->header().request_id(),
+                                                response_pb_.get());
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC success response for "
+        << call_->ToString() << ":" << std::endl << SecureDebugString(*response_pb_);
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "response", pb_util::PbTracer::TracePb(*response_pb_),
+                           "trace", trace()->DumpToString());
+    call_->RespondSuccess(*response_pb_);
+    delete this;
+  }
+}
+
+void RpcContext::RespondNoCache() {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    response_pb_.get());
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
+        << call_->ToString() << ": " << SecureDebugString(*response_pb_);
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "response", pb_util::PbTracer::TracePb(*response_pb_),
+                           "trace", trace()->DumpToString());
+    // This is a bit counter intuitive, but when we get the failure but set the error on the
+    // call's response we call RespondSuccess() instead of RespondFailure().
+    call_->RespondSuccess(*response_pb_);
+    delete this;
+  }
+}
+
+void RpcContext::RespondFailure(const Status &status) {
+  return RespondRpcFailure(ErrorStatusPB::ERROR_APPLICATION, status);
+}
+
+void RpcContext::RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status) {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    err, status);
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
+        << call_->ToString() << ": " << status.ToString();
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "status", status.ToString(),
+                           "trace", trace()->DumpToString());
+    call_->RespondFailure(err, status);
+    delete this;
+  }
+}
+
+void RpcContext::RespondApplicationError(int error_ext_id, const std::string& message,
+                                         const Message& app_error_pb) {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    error_ext_id, message, app_error_pb);
+  } else {
+    if (VLOG_IS_ON(4)) {
+      ErrorStatusPB err;
+      InboundCall::ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
+      VLOG(4) << call_->remote_method().service_name()
+          << ": Sending application error response for " << call_->ToString()
+          << ":" << std::endl << SecureDebugString(err);
+    }
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "response", pb_util::PbTracer::TracePb(app_error_pb),
+                           "trace", trace()->DumpToString());
+    call_->RespondApplicationError(error_ext_id, message, app_error_pb);
+    delete this;
+  }
+}
+
+const rpc::RequestIdPB* RpcContext::request_id() const {
+  return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
+}
+
+size_t RpcContext::GetTransferSize() const {
+  return call_->GetTransferSize();
+}
+
+Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  return call_->AddOutboundSidecar(std::move(car), idx);
+}
+
+Status RpcContext::GetInboundSidecar(int idx, Slice* slice) {
+  return call_->GetInboundSidecar(idx, slice);
+}
+
+const RemoteUser& RpcContext::remote_user() const {
+  return call_->remote_user();
+}
+
+bool RpcContext::is_confidential() const {
+  return call_->connection()->is_confidential();
+}
+
+void RpcContext::DiscardTransfer() {
+  call_->DiscardTransfer();
+}
+
+const Sockaddr& RpcContext::remote_address() const {
+  return call_->remote_address();
+}
+
+std::string RpcContext::requestor_string() const {
+  return call_->remote_user().ToString() + " at " +
+    call_->remote_address().ToString();
+}
+
+std::string RpcContext::method_name() const {
+  return call_->remote_method().method_name();
+}
+
+std::string RpcContext::service_name() const {
+  return call_->remote_method().service_name();
+}
+
+MonoTime RpcContext::GetClientDeadline() const {
+  return call_->GetClientDeadline();
+}
+
+MonoTime RpcContext::GetTimeReceived() const {
+  return call_->GetTimeReceived();
+}
+
+Trace* RpcContext::trace() {
+  return call_->trace();
+}
+
+void RpcContext::Panic(const char* filepath, int line_number, const string& message) {
+  // Use the LogMessage class directly so that the log messages appear to come from
+  // the line of code which caused the panic, not this code.
+#define MY_ERROR google::LogMessage(filepath, line_number, google::GLOG_ERROR).stream()
+#define MY_FATAL google::LogMessageFatal(filepath, line_number).stream()
+
+  MY_ERROR << "Panic handling " << call_->ToString() << ": " << message;
+  MY_ERROR << "Request:\n" << SecureDebugString(*request_pb_);
+  Trace* t = trace();
+  if (t) {
+    MY_ERROR << "RPC trace:";
+    t->Dump(&MY_ERROR, true);
+  }
+  MY_FATAL << "Exiting due to panic.";
+
+#undef MY_ERROR
+#undef MY_FATAL
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.h b/be/src/kudu/rpc/rpc_context.h
new file mode 100644
index 0000000..c729d5e
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_context.h
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_CONTEXT_H
+#define KUDU_RPC_RPC_CONTEXT_H
+
+#include <memory>
+#include <stddef.h>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Slice;
+class Sockaddr;
+class Trace;
+
+namespace rpc {
+
+class InboundCall;
+class RemoteUser;
+class ResultTracker;
+class RpcSidecar;
+
+#define PANIC_RPC(rpc_context, message) \
+  do { \
+    if (rpc_context) {                              \
+      rpc_context->Panic(__FILE__, __LINE__, (message));  \
+    } else { \
+      LOG(FATAL) << message; \
+    } \
+  } while (0)
+
+// The context provided to a generated ServiceIf. This provides
+// methods to respond to the RPC. In the future, this will also
+// include methods to access information about the caller: e.g
+// authentication info, tracing info, and cancellation status.
+//
+// This is the server-side analogue to the RpcController class.
+class RpcContext {
+ public:
+  // Create an RpcContext. This is called only from generated code
+  // and is not a public API.
+  RpcContext(InboundCall *call,
+             const google::protobuf::Message *request_pb,
+             google::protobuf::Message *response_pb,
+             scoped_refptr<ResultTracker> result_tracker);
+
+  ~RpcContext();
+
+  // Return the trace buffer for this call.
+  Trace* trace();
+
+  // Send a response to the call. The service may call this method
+  // before or after returning from the original handler method,
+  // and it may call this method from a different thread.
+  //
+  // The response should be prepared already in the response PB pointer
+  // which was passed to the handler method.
+  //
+  // After this method returns, this RpcContext object is destroyed. The request
+  // and response protobufs are also destroyed.
+  void RespondSuccess();
+
+  // Like the above, but doesn't store the results of the service call, if results
+  // are being tracked.
+  // Used in cases where a call specific error was set on the response protobuf,
+  // the call should be considered failed, thus results shouldn't be cached.
+  void RespondNoCache();
+
+  // Respond with an error to the client. This sends back an error with the code
+  // ERROR_APPLICATION. Because there is no more specific error code passed back
+  // to the client, most applications should create a custom error PB extension
+  // and use RespondApplicationError(...) below. This method should only be used
+  // for unexpected errors where the server doesn't expect the client to do any
+  // more advanced handling.
+  //
+  // After this method returns, this RpcContext object is destroyed. The request
+  // and response protobufs are also destroyed.
+  void RespondFailure(const Status &status);
+
+  // Respond with an RPC-level error. This typically manifests to the client as
+  // a remote error, one whose handling is agnostic to the particulars of the
+  // sent RPC. For example, both ERROR_SERVER_TOO_BUSY and ERROR_UNAVAILABLE
+  // usually cause the client to retry the RPC at a later time.
+  //
+  // After this method returns, this RpcContext object is destroyed. The request
+  // and response protobufs are also destroyed.
+  void RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status);
+
+  // Respond with an application-level error. This causes the caller to get a
+  // RemoteError status with the provided string message. Additionally, a
+  // service-specific error extension is passed back to the client. The
+  // extension must be registered with the ErrorStatusPB protobuf. For
+  // example:
+  //
+  //   message MyServiceError {
+  //     extend kudu.rpc.ErrorStatusPB {
+  //       optional MyServiceError my_service_error_ext = 101;
+  //     }
+  //     // Add any extra fields or status codes you want to pass back to
+  //     // the client here.
+  //     required string extra_error_data = 1;
+  //   }
+  //
+  // NOTE: the numeric '101' above must be an integer greater than 101
+  // and must be unique across your code base.
+  //
+  // Given the above definition in your service protobuf file, you would
+  // use this method like:
+  //
+  //   MyServiceError err;
+  //   err.set_extra_error_data("foo bar");
+  //   ctx->RespondApplicationError(MyServiceError::my_service_error_ext.number(),
+  //                                "Some error occurred", err);
+  //
+  // The client side may then retreieve the error by calling:
+  //   const MyServiceError& err_details =
+  //     controller->error_response()->GetExtension(MyServiceError::my_service_error_ext);
+  //
+  // After this method returns, this RpcContext object is destroyed. The request
+  // and response protobufs are also destroyed.
+  void RespondApplicationError(int error_ext_id, const std::string& message,
+                               const google::protobuf::Message& app_error_pb);
+
+
+  // Adds an RpcSidecar to the response. This is the preferred method for
+  // transferring large amounts of binary data, because this avoids additional
+  // copies made by serializing the protobuf.
+  //
+  // Assumes no changes to the sidecar's data are made after insertion.
+  //
+  // Upon success, writes the index of the sidecar (necessary to be retrieved
+  // later) to 'idx'. Call may fail if all sidecars have already been used
+  // by the RPC response.
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+  // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 'idx' is out
+  // of bounds.
+  Status GetInboundSidecar(int idx, Slice* slice);
+
+  // Return the identity of remote user who made this call.
+  const RemoteUser& remote_user() const;
+
+  // Whether it's OK to pass confidential information between the client and the
+  // server in the context of the RPC call being handled.  In real world, this
+  // translates into properties of the connection between the client and the
+  // server. For example, this methods returns 'true' for a call over an
+  // encrypted connection.
+  bool is_confidential() const;
+
+  // Discards the memory associated with the inbound call's payload. All previously
+  // obtained sidecar slices will be invalidated by this call. It is an error to call
+  // GetInboundSidecar() after this method. request_pb() remains valid.
+  // This is useful in the case where the server wishes to delay responding to an RPC
+  // (perhaps to control the rate of RPC requests), but knows that the RPC payload itself
+  // won't be processed any further.
+  void DiscardTransfer();
+
+  // Return the remote IP address and port which sent the current RPC call.
+  const Sockaddr& remote_address() const;
+
+  // A string identifying the requestor -- both the user info and the IP address.
+  // Suitable for use in log messages.
+  std::string requestor_string() const;
+
+  // Return the name of the RPC service method being called.
+  std::string method_name() const;
+
+  // Return the name of the RPC service being called.
+  std::string service_name() const;
+
+  const google::protobuf::Message *request_pb() const { return request_pb_.get(); }
+  google::protobuf::Message *response_pb() const { return response_pb_.get(); }
+
+  // Return an upper bound on the client timeout deadline. This does not
+  // account for transmission delays between the client and the server.
+  // If the client did not specify a deadline, returns MonoTime::Max().
+  MonoTime GetClientDeadline() const;
+
+  // Return the time when the inbound call was received.
+  MonoTime GetTimeReceived() const;
+
+  // Whether the results of this RPC are tracked with a ResultTracker.
+  // If this returns true, both result_tracker() and request_id() should return non-null results.
+  bool AreResultsTracked() const { return result_tracker_.get() != nullptr; }
+
+  // Returns this call's result tracker, if it is set.
+  const scoped_refptr<ResultTracker>& result_tracker() const {
+    return result_tracker_;
+  }
+
+  // Returns this call's request id, if it is set.
+  const rpc::RequestIdPB* request_id() const;
+
+  // Returns the size of the transfer buffer that backs 'call_'. If the
+  // transfer buffer no longer exists (e.g. GetTransferSize() is called after
+  // DiscardTransfer()), returns 0.
+  size_t GetTransferSize() const;
+
+  // Panic the server. This logs a fatal error with the given message, and
+  // also includes the current RPC request, requestor, trace information, etc,
+  // to make it easier to debug.
+  //
+  // Call this via the PANIC_RPC() macro.
+  void Panic(const char* filepath, int line_number, const std::string& message)
+    __attribute__((noreturn));
+
+ private:
+  friend class ResultTracker;
+  InboundCall* const call_;
+  const gscoped_ptr<const google::protobuf::Message> request_pb_;
+  const gscoped_ptr<google::protobuf::Message> response_pb_;
+  scoped_refptr<ResultTracker> result_tracker_;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.cc b/be/src/kudu/rpc/rpc_controller.cc
new file mode 100644
index 0000000..77c7ca4
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_controller.cc
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_controller.h"
+
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/slice.h"
+
+
+using std::unique_ptr;
+using strings::Substitute;
+namespace kudu {
+
+namespace rpc {
+
+RpcController::RpcController()
+    : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS), messenger_(nullptr) {
+  DVLOG(4) << "RpcController " << this << " constructed";
+}
+
+RpcController::~RpcController() {
+  DVLOG(4) << "RpcController " << this << " destroyed";
+}
+
+void RpcController::Swap(RpcController* other) {
+  // Cannot swap RPC controllers while they are in-flight.
+  if (call_) {
+    CHECK(finished());
+  }
+  if (other->call_) {
+    CHECK(other->finished());
+  }
+
+  std::swap(outbound_sidecars_, other->outbound_sidecars_);
+  std::swap(outbound_sidecars_total_bytes_, other->outbound_sidecars_total_bytes_);
+  std::swap(timeout_, other->timeout_);
+  std::swap(credentials_policy_, other->credentials_policy_);
+  std::swap(call_, other->call_);
+}
+
+void RpcController::Reset() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (call_) {
+    CHECK(finished());
+  }
+  call_.reset();
+  required_server_features_.clear();
+  credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS;
+  messenger_ = nullptr;
+  outbound_sidecars_total_bytes_ = 0;
+}
+
+bool RpcController::finished() const {
+  if (call_) {
+    return call_->IsFinished();
+  }
+  return false;
+}
+
+bool RpcController::negotiation_failed() const {
+  if (call_) {
+    DCHECK(finished());
+    return call_->IsNegotiationError();
+  }
+  return false;
+}
+
+Status RpcController::status() const {
+  if (call_) {
+    return call_->status();
+  }
+  return Status::OK();
+}
+
+const ErrorStatusPB* RpcController::error_response() const {
+  if (call_) {
+    return call_->error_pb();
+  }
+  return nullptr;
+}
+
+Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const {
+  return call_->call_response_->GetSidecar(idx, sidecar);
+}
+
+void RpcController::set_timeout(const MonoDelta& timeout) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  DCHECK(!call_ || call_->state() == OutboundCall::READY);
+  timeout_ = timeout;
+}
+
+void RpcController::set_deadline(const MonoTime& deadline) {
+  set_timeout(deadline - MonoTime::Now());
+}
+
+void RpcController::SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id) {
+  request_id_ = std::move(request_id);
+}
+
+bool RpcController::has_request_id() const {
+  return request_id_ != nullptr;
+}
+
+const RequestIdPB& RpcController::request_id() const {
+  DCHECK(has_request_id());
+  return *request_id_;
+}
+
+void RpcController::RequireServerFeature(uint32_t feature) {
+  DCHECK(!call_ || call_->state() == OutboundCall::READY);
+  required_server_features_.insert(feature);
+}
+
+MonoDelta RpcController::timeout() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return timeout_;
+}
+
+Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
+    return Status::RuntimeError("All available sidecars already used");
+  }
+  int64_t sidecar_bytes = car->AsSlice().size();
+  if (outbound_sidecars_total_bytes_ >
+      TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
+    return Status::RuntimeError(Substitute("Total size of sidecars $0 would exceed limit $1",
+        static_cast<int64_t>(outbound_sidecars_total_bytes_) + sidecar_bytes,
+        TransferLimits::kMaxTotalSidecarBytes));
+  }
+
+  outbound_sidecars_.emplace_back(std::move(car));
+  outbound_sidecars_total_bytes_ += sidecar_bytes;
+  DCHECK_GE(outbound_sidecars_total_bytes_, 0);
+  *idx = outbound_sidecars_.size() - 1;
+  return Status::OK();
+}
+
+void RpcController::SetRequestParam(const google::protobuf::Message& req) {
+  DCHECK(call_ != nullptr);
+  call_->SetRequestPayload(req, std::move(outbound_sidecars_));
+}
+
+void RpcController::Cancel() {
+  DCHECK(call_);
+  DCHECK(messenger_);
+  messenger_->QueueCancellation(call_);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.h b/be/src/kudu/rpc/rpc_controller.h
new file mode 100644
index 0000000..aa61d83
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_controller.h
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_CONTROLLER_H
+#define KUDU_RPC_RPC_CONTROLLER_H
+
+#include <cstdint>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Slice;
+
+namespace rpc {
+
+class ErrorStatusPB;
+class Messenger;
+class OutboundCall;
+class RequestIdPB;
+class RpcSidecar;
+
+// Authentication credentials policy for outbound RPCs. Some RPC methods
+// (e.g. MasterService::ConnectToMaster) behave differently depending on the
+// type of credentials used for authentication when establishing the connection.
+// The client expecting some particular results from the call should specify
+// the required policy on a per-call basis using RpcController. By default,
+// RpcController uses ANY_CREDENTIALS.
+enum class CredentialsPolicy {
+  // It's acceptable to use authentication credentials of any type, primary or
+  // secondary ones.
+  ANY_CREDENTIALS,
+
+  // Only primary credentials are acceptable. Primary credentials are Kerberos
+  // tickets, TLS certificate. Secondary credentials are authentication tokens:
+  // they are 'derived' in the sense that it's possible to acquire them using
+  // 'primary' credentials.
+  PRIMARY_CREDENTIALS,
+};
+
+// Controller for managing properties of a single RPC call, on the client side.
+//
+// An RpcController maps to exactly one call and is not thread-safe. The client
+// may use this class prior to sending an RPC in order to set properties such
+// as the call's timeout.
+//
+// After the call has been sent (e.g using Proxy::AsyncRequest()) the user
+// may invoke methods on the RpcController object in order to probe the status
+// of the call.
+class RpcController {
+ public:
+  RpcController();
+  ~RpcController();
+
+  // Swap the state of the controller (including ownership of sidecars, buffers,
+  // etc) with another one.
+  void Swap(RpcController* other);
+
+  // Reset this controller so it may be used with another call.
+  // Note that this resets the required server features.
+  void Reset();
+
+  // Return true if the call has finished.
+  // A call is finished if the server has responded, or if the call
+  // has timed out.
+  bool finished() const;
+
+  // Whether the call failed due to connection negotiation error.
+  bool negotiation_failed() const;
+
+  // Return the current status of a call.
+  //
+  // A call is "OK" status until it finishes, at which point it may
+  // either remain in "OK" status (if the call was successful), or
+  // change to an error status. Error status indicates that there was
+  // some RPC-layer issue with making the call, for example, one of:
+  //
+  // * failed to establish a connection to the server
+  // * the server was too busy to handle the request
+  // * the server was unable to interpret the request (eg due to a version
+  //   mismatch)
+  // * a network error occurred which caused the connection to be torn
+  //   down
+  // * the call timed out
+  Status status() const;
+
+  // If status() returns a RemoteError object, then this function returns
+  // the error response provided by the server. Service implementors may
+  // use protobuf Extensions to add application-specific data to this PB.
+  //
+  // If Status was not a RemoteError, this returns NULL.
+  // The returned pointer is only valid as long as the controller object.
+  const ErrorStatusPB* error_response() const;
+
+  // Set the timeout for the call to be made with this RPC controller.
+  //
+  // The configured timeout applies to the entire time period between
+  // the AsyncRequest() method call and getting a response. For example,
+  // if it takes too long to establish a connection to the remote host,
+  // or to DNS-resolve the remote host, those will be accounted as part
+  // of the timeout period.
+  //
+  // Timeouts must be set prior to making the request -- the timeout may
+  // not currently be adjusted for an already-sent call.
+  //
+  // Using an uninitialized timeout will result in a call which never
+  // times out (not recommended!)
+  void set_timeout(const MonoDelta& timeout);
+
+  // Like a timeout, but based on a fixed point in time instead of a delta.
+  //
+  // Using an uninitialized deadline means the call won't time out.
+  void set_deadline(const MonoTime& deadline);
+
+  // Allows setting the request id for the next request sent to the server.
+  // A request id allows the server to identify each request sent by the client uniquely,
+  // in some cases even when sent to multiple servers, enabling exactly once semantics.
+  void SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id);
+
+  // Returns whether a request id has been set on RPC header.
+  bool has_request_id() const;
+
+  // Returns the currently set request id.
+  // When the request is sent to the server, it gets "moved" from RpcController
+  // so an absence of a request after send doesn't mean one wasn't sent.
+  // REQUIRES: the controller has a request ID set.
+  const RequestIdPB& request_id() const;
+
+  // Add a requirement that the server side must support a feature with the
+  // given identifier. The set of required features is sent to the server
+  // with the RPC call, and if any required feature is not supported, the
+  // call will fail with a NotSupported() status.
+  //
+  // This can be used when an RPC call changes in a way that is protobuf-compatible,
+  // but for which it would not be appropriate for the server to simply ignore
+  // an added field. For example, consider an API call like:
+  //
+  //   message DeleteAccount {
+  //     optional string username = 1;
+  //     optional bool dry_run = 2; // ADDED LATER!
+  //   }
+  //
+  // In this case, if a new client which supports the 'dry_run' flag sends the RPC
+  // to an old server, the old server will simply ignore the unrecognized parameter,
+  // with highly problematic results. To solve this problem, the new version can
+  // add a feature flag:
+  //
+  //   In .proto file
+  //   ----------------
+  //   enum MyFeatureFlags {
+  //     UNKNOWN = 0;
+  //     DELETE_ACCOUNT_SUPPORTS_DRY_RUN = 1;
+  //   }
+  //
+  //   In client code:
+  //   ---------------
+  //   if (dry_run) {
+  //     rpc.RequireServerFeature(DELETE_ACCOUNT_SUPPORTS_DRY_RUN);
+  //     req.set_dry_run(true);
+  //   }
+  //
+  // This has the effect of (a) maintaining compatibility when dry_run is not specified
+  // and (b) rejecting the RPC with a "NotSupported" error when it is.
+  //
+  // NOTE: 'feature' is an int rather than an enum type because each service
+  // must define its own enum of supported features, and protobuf doesn't support
+  // any ability to 'extend' enum types. Implementers should define an enum in the
+  // service's protobuf definition as shown above.
+  void RequireServerFeature(uint32_t feature);
+
+  // Executes the provided function with a reference to the required server
+  // features.
+  const std::unordered_set<uint32_t>& required_server_features() const {
+    return required_server_features_;
+  }
+
+  // Return the configured timeout.
+  MonoDelta timeout() const;
+
+  CredentialsPolicy credentials_policy() const {
+    return credentials_policy_;
+  }
+
+  void set_credentials_policy(CredentialsPolicy policy) {
+    credentials_policy_ = policy;
+  }
+
+  // Fills the 'sidecar' parameter with the slice pointing to the i-th
+  // sidecar upon success.
+  //
+  // Should only be called if the call's finished, but the controller has not
+  // been Reset().
+  //
+  // May fail if index is invalid.
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+  // Adds a sidecar to the outbound request. The index of the sidecar is written to
+  // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
+  // to this request. Also returns an error if the total size of all sidecars would
+  // exceed TransferLimits::kMaxTotalSidecarBytes.
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+  // Cancel the call associated with the RpcController. This function should only be
+  // called when there is an outstanding outbound call. It's always safe to call
+  // Cancel() after you've sent a call, so long as you haven't called Reset() yet.
+  // Caller is not responsible for synchronization between cancellation and the
+  // callback. (i.e. the callback may or may not be invoked yet when Cancel()
+  // is called).
+  //
+  // Cancellation is "best effort" - i.e. it's still possible the callback passed
+  // to the call will be fired with a success status. If cancellation succeeds,
+  // the callback will be invoked with a Aborted status. Cancellation is asynchronous
+  // so the callback will still be invoked from the reactor thread.
+  void Cancel();
+
+ private:
+  friend class OutboundCall;
+  friend class Proxy;
+
+  // Set the outbound call_'s request parameter, and transfer ownership of
+  // outbound_sidecars_ to call_ in preparation for serialization.
+  void SetRequestParam(const google::protobuf::Message& req);
+
+  // Set the messenger which contains the reactor thread handling the outbound call.
+  void SetMessenger(Messenger* messenger) { messenger_ = messenger; }
+
+  MonoDelta timeout_;
+  std::unordered_set<uint32_t> required_server_features_;
+
+  // RPC authentication policy for outbound calls.
+  CredentialsPolicy credentials_policy_;
+
+  mutable simple_spinlock lock_;
+
+  // The id of this request.
+  // Ownership is transferred to OutboundCall once the call is sent.
+  std::unique_ptr<RequestIdPB> request_id_;
+
+  // The messenger which contains the reactor thread for 'call_'.
+  // Set only when 'call_' is set.
+  Messenger* messenger_;
+
+  // Once the call is sent, it is tracked here.
+  std::shared_ptr<OutboundCall> call_;
+
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+  // Total size of sidecars in outbound_sidecars_. This is limited to a maximum
+  // of TransferLimits::kMaxTotalSidecarBytes.
+  int32_t outbound_sidecars_total_bytes_ = 0;
+
+  DISALLOW_COPY_AND_ASSIGN(RpcController);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_header.proto b/be/src/kudu/rpc/rpc_header.proto
new file mode 100644
index 0000000..1d55b6a
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_header.proto
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+
+option optimize_for = SPEED;
+
+package kudu.rpc;
+
+option java_package = "org.apache.kudu.rpc";
+
+import "google/protobuf/descriptor.proto";
+import "kudu/security/token.proto";
+import "kudu/util/pb_util.proto";
+
+// The Kudu RPC protocol is similar to the RPC protocol of Hadoop and HBase.
+// See the following for reference on those other protocols:
+//  - https://issues.apache.org/jira/browse/HBASE-7898
+//  - https://issues.apache.org/jira/browse/HADOOP-8990
+//
+// For a description of the Kudu protocol, see 'README' in this directory.
+
+// User Information proto.  Included in ConnectionContextPB on connection setup.
+message UserInformationPB {
+  optional string effective_user = 1;
+  required string real_user = 2;
+}
+
+// The connection context is sent as part of the connection establishment.
+// It establishes the context for ALL RPC calls within the connection.
+// This is sent on connection setup after the connection preamble is sent
+// and SASL has been negotiated.
+// No response is sent from the server to the client.
+message ConnectionContextPB {
+  // UserInfo beyond what is determined as part of security handshake
+  // at connection time (kerberos, tokens etc).
+  //
+  // DEPRECATED: No longer used in Kudu 1.1 and later.
+  // The 'real_user' should be taken from the SASL negotiation.
+  // Impersonation (effective user) was never supported, so we'll have
+  // to add that back at some point later.
+  optional UserInformationPB DEPRECATED_user_info = 2;
+
+  // If the server sends a nonce to the client during the SASL_SUCCESS
+  // negotiation step, the client is required to encode it with SASL integrity
+  // protection and return it in this field. The nonce protects the server
+  // against a Kerberos replay attack.
+  optional bytes encoded_nonce = 3 [(REDACT) = true];
+}
+
+// Features supported by the RPC system itself.
+//
+// Note that this should be used to evolve the RPC _system_, not the semantics
+// or compatibility of individual calls.
+//
+// For example, if we were to add a feature like call or response wire
+// compression in the future, we could add a flag here to indicate that the
+// client or server supports that feature. Optional features which may safely be
+// ignored by the receiver do not need a feature flag, instead the optional
+// field feature of ProtoBuf may be utilized.
+enum RpcFeatureFlag {
+  UNKNOWN = 0;
+
+  // The RPC system is required to support application feature flags in the
+  // request and response headers.
+  APPLICATION_FEATURE_FLAGS = 1;
+
+  // The RPC system supports TLS protected connections. If both sides support
+  // this flag, the connection will automatically be wrapped in a TLS protected
+  // channel following a TLS handshake.
+  TLS = 2;
+
+  // If both sides advertise TLS_AUTHENTICATION_ONLY, this means that they
+  // agree that, after handshaking TLS, they will *not* wrap the connection
+  // in a TLS-protected channel. Instead, they will use TLS only for its
+  // handshake-based authentication.
+  //
+  // This is currently used for loopback connections only, so that compute
+  // frameworks which schedule for locality don't pay encryption overhead.
+  TLS_AUTHENTICATION_ONLY = 3;
+};
+
+// An authentication type. This is modeled as a oneof in case any of these
+// authentication types, or any authentication types in the future, need to add
+// extra type-specific parameters during negotiation.
+message AuthenticationTypePB {
+  message Sasl {};
+  message Token {};
+  message Certificate {};
+
+  oneof type {
+    // The server and client mutually authenticate via SASL.
+    Sasl sasl = 1;
+
+    // The server authenticates the client via a signed token, and the client
+    // authenticates the server by verifying its certificate has been signed by
+    // a trusted CA.
+    //
+    // Token authentication requires the connection to be TLS encrypted.
+    Token token = 2;
+
+    // The server and client mutually authenticate by certificate.
+    //
+    // Certificate authentication requires the connection to be TLS encrypted.
+    Certificate certificate = 3;
+  }
+}
+
+// Message type passed back & forth for the SASL negotiation.
+message NegotiatePB {
+  enum NegotiateStep {
+    UNKNOWN        = 999;
+    NEGOTIATE      = 1;
+    SASL_SUCCESS   = 0;
+    SASL_INITIATE  = 2;
+    SASL_CHALLENGE = 3;
+    SASL_RESPONSE  = 4;
+    TLS_HANDSHAKE  = 5;
+    TOKEN_EXCHANGE = 6;
+  }
+
+  message SaslMechanism {
+    // The SASL mechanism, i.e. 'PLAIN' or 'GSSAPI'.
+    required string mechanism = 2;
+
+    // Deprecated: no longer used.
+    // optional string method = 1;
+    // optional bytes challenge = 5 [(REDACT) = true];
+  }
+
+  // When the client sends its NEGOTIATE step message, it sends its set of
+  // supported RPC system features. In the response to this message, the server
+  // sends back its own. This allows the two peers to agree on whether newer
+  // extensions of the RPC system may be used on this connection. We use a list
+  // of features rather than a simple version number to make it easier for the
+  // Java and C++ clients to implement features in different orders while still
+  // maintaining compatibility, as well as to simplify backporting of features
+  // out-of-order.
+  repeated RpcFeatureFlag supported_features = 1;
+
+  // The current negotiation step.
+  required NegotiateStep step  = 2;
+
+  // The SASL token, containing either the challenge during the SASL_CHALLENGE
+  // step, or the response during the SASL_RESPONSE step.
+  optional bytes token         = 3 [(REDACT) = true];
+
+  // During the TLS_HANDSHAKE step, contains the TLS handshake message.
+  optional bytes tls_handshake = 5 [(REDACT) = true];
+
+  // The tls-server-end-point channel bindings as specified in RFC 5929.  Sent
+  // from the server to the client during the SASL_SUCCESS step when the
+  // Kerberos (GSSAPI) SASL mechanism is used with TLS, in order to bind the
+  // Kerberos authenticated channel to the TLS channel. The value is integrity
+  // protected through SASL. The client is responsible for validating that the
+  // value matches the expected value.
+  optional bytes channel_bindings = 6 [(REDACT) = true];
+
+  // A random nonce sent from the server to the client during the SASL_SUCCESS
+  // step when the Kerberos (GSSAPI) SASL mechanism is used with TLS. The nonce
+  // must be sent back to the server, wrapped in SASL integrity protection, as
+  // part of the connection context.
+  optional bytes nonce = 9 [(REDACT) = true];
+
+  // During the NEGOTIATE step, contains the supported SASL mechanisms.
+  // During the SASL_INITIATE step, contains the single chosen SASL mechanism.
+  repeated SaslMechanism sasl_mechanisms = 4;
+
+  // During the client to server NEGOTIATE step, contains the supported authentication types.
+  // During the server to client NEGOTIATE step, contains the chosen authentication type.
+  repeated AuthenticationTypePB authn_types = 7;
+
+  // During the TOKEN_EXCHANGE step, contains the client's signed authentication token.
+  optional security.SignedTokenPB authn_token = 8;
+}
+
+message RemoteMethodPB {
+  // Service name for the RPC layer.
+  // The client created a proxy with this service name.
+  // Example: kudu.rpc_test.CalculatorService
+  required string service_name = 1;
+
+  // Name of the RPC method.
+  required string method_name = 2;
+};
+
+// The Id of a retriable RPC, whose results should be tracked on the server (see result_tracker.h).
+// This also includes some information that is useful for execution/garbage collection.
+message RequestIdPB {
+  // The (globally unique) id of the client performing this RPC.
+  required string client_id = 1;
+
+  // The (per-client unique) sequence number of this RPC.
+  required int64 seq_no = 2;
+
+  // The sequence number of the first RPC that has not been marked as completed by the client.
+  // Unset if there isn't an incomplete RPC.
+  required int64 first_incomplete_seq_no = 3;
+
+  // The number of times this RPC has been tried.
+  // Set to 1 in the first attempt.
+  required int64 attempt_no = 4;
+}
+
+// The header for the RPC request frame.
+message RequestHeader {
+  // A sequence number that uniquely identifies a call to a single remote server. This number is
+  // sent back in the Response and allows to match it to the original Request.
+  // Hadoop specifies a uint32 and casts it to a signed int. That is counterintuitive, so we use an
+  // int32 instead. Allowed values (inherited from Hadoop):
+  //   0 through INT32_MAX: Regular RPC call IDs.
+  //   -2: Invalid call ID.
+  //   -3: Connection context call ID.
+  //   -33: SASL negotiation call ID.
+  //
+  // NOTE: these calls must be increasing but may have gaps.
+  required int32 call_id = 3;
+
+  // RPC method being invoked.
+  // Not used for "connection setup" calls.
+  optional RemoteMethodPB remote_method = 6;
+
+  // Propagate the timeout as specified by the user. Note that, since there is some
+  // transit time between the client and server, if you wait exactly this amount of
+  // time and then respond, you are likely to cause a timeout on the client.
+  optional uint32 timeout_millis = 10;
+
+  // Feature flags that the service must support in order to properly interpret this
+  // request. The client can pass any set of flags, and if the server doesn't
+  // support any of them, then it will fail the request.
+  //
+  // NOTE: these are for evolving features at the level of the application, not
+  // the RPC framework. Hence, we have to use a generic int type rather than a
+  // particular enum.
+  // NOTE: the server will only interpret this field if it supports the
+  // APPLICATION_FEATURE_FLAGS flag.
+  repeated uint32 required_feature_flags = 11;
+
+  // The unique id of this request, if it's retriable and if the results are to be tracked.
+  // The request id is unique per logical request, i.e. retries of the same RPC must have the
+  // same request id.
+  // Note that this is different from 'call_id' in that a call_id is unique to a server while a
+  // request_id is unique to a logical request (i.e. the request_id remains the same when a request
+  // is retried on a different server).
+  // Optional for requests that are naturally idempotent or to maintain compatibility with
+  // older clients for requests that are not.
+  optional RequestIdPB request_id = 15;
+
+  // Byte offsets for side cars in the main body of the request message.
+  // These offsets are counted AFTER the message header, i.e., offset 0
+  // is the first byte after the bytes for this protobuf.
+  repeated uint32 sidecar_offsets = 16;
+}
+
+message ResponseHeader {
+  required int32 call_id = 1;
+
+  // If this is set, then this is an error response and the
+  // response message will be of type ErrorStatusPB instead of
+  // the expected response type.
+  optional bool is_error = 2 [ default = false ];
+
+  // Byte offsets for side cars in the main body of the response message.
+  // These offsets are counted AFTER the message header, i.e., offset 0
+  // is the first byte after the bytes for this protobuf.
+  repeated uint32 sidecar_offsets = 3;
+}
+
+// Sent as response when is_error == true.
+message ErrorStatusPB {
+
+  // These codes have all been inherited from Hadoop's RPC mechanism.
+  enum RpcErrorCodePB {
+    FATAL_UNKNOWN = 10;
+
+    // Non-fatal RPC errors. Connection should be left open for future RPC calls.
+    //------------------------------------------------------------
+    // The application generated an error status. See the message field for
+    // more details.
+    ERROR_APPLICATION = 1;
+
+    // The specified method was not valid.
+    ERROR_NO_SUCH_METHOD = 2;
+
+    // The specified service was not valid.
+    ERROR_NO_SUCH_SERVICE = 3;
+
+    // The server is overloaded - the client should try again shortly.
+    ERROR_SERVER_TOO_BUSY = 4;
+
+    // The request parameter was not parseable, was missing required fields,
+    // or the server does not support the required feature flags.
+    ERROR_INVALID_REQUEST = 5;
+
+    // The server might have previously received this request but its response is no
+    // longer cached. It's unknown whether the request was executed or not.
+    ERROR_REQUEST_STALE = 6;
+
+    // The server is not able to complete the connection or request at this
+    // time. The client may try again later.
+    ERROR_UNAVAILABLE = 7;
+
+    // FATAL_* errors indicate that the client should shut down the connection.
+    //------------------------------------------------------------
+    // The RPC server is already shutting down.
+    FATAL_SERVER_SHUTTING_DOWN = 11;
+    // Fields of RpcHeader are invalid.
+    FATAL_INVALID_RPC_HEADER = 12;
+    // Could not deserialize RPC request.
+    FATAL_DESERIALIZING_REQUEST = 13;
+    // IPC Layer version mismatch.
+    FATAL_VERSION_MISMATCH = 14;
+    // Auth failed.
+    FATAL_UNAUTHORIZED = 15;
+
+    // The authentication token is invalid or expired;
+    // the client should obtain a new one.
+    FATAL_INVALID_AUTHENTICATION_TOKEN = 16;
+  }
+
+  required string message = 1;
+
+  // TODO: Make code required?
+  optional RpcErrorCodePB code = 2;  // Specific error identifier.
+
+  // If the request is failed due to an unsupported feature flag, the particular
+  // flag(s) that were not supported will be sent back to the client.
+  repeated uint32 unsupported_feature_flags = 3;
+
+  // Allow extensions. When the RPC returns ERROR_APPLICATION, the server
+  // should also fill in exactly one of these extension fields, which contains
+  // more details on the service-specific error.
+  extensions 100 to max;
+}
+
+extend google.protobuf.MethodOptions {
+  // An option for RPC methods that allows to set whether that method's
+  // RPC results should be tracked with a ResultTracker.
+  optional bool track_rpc_result = 50006 [default=false];
+
+  // An option to set the authorization method for this particular
+  // RPC method. If this is not specified, the service's 'default_authz_method'
+  // is used.
+  optional string authz_method = 50007;
+}
+
+extend google.protobuf.ServiceOptions {
+  // Set the default authorization method for the RPCs in this service.
+  // If this is not set, then the default authorization is to allow all
+  // RPCs.
+  optional string default_authz_method = 50007;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto
new file mode 100644
index 0000000..7685903
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_introspection.proto
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Protobuf used for introspection of RPC services (eg listing in-flight RPCs,
+// reflection, etc)
+syntax = "proto2";
+
+package kudu.rpc;
+
+option java_package = "org.apache.kudu";
+
+import "kudu/rpc/rpc_header.proto";
+
+message RpcCallInProgressPB {
+  required RequestHeader header = 1;
+  optional string trace_buffer = 2;
+  optional uint64 micros_elapsed = 3;
+
+  enum State {
+    UNKNOWN = 999;
+
+    // States for OutboundCall
+    ON_OUTBOUND_QUEUE = 1;
+    SENDING = 2;
+    SENT = 3;
+    TIMED_OUT = 4;
+    FINISHED_ERROR = 5;
+    FINISHED_SUCCESS = 6;
+    NEGOTIATION_TIMED_OUT = 7;
+    FINISHED_NEGOTIATION_ERROR = 8;
+    CANCELLED = 9;
+
+    // TODO(todd): add states for InboundCall
+  }
+
+  optional State state = 4;
+}
+
+message RpcConnectionPB {
+  enum StateType {
+    UNKNOWN = 999;
+    NEGOTIATING = 0;  // Connection is still being negotiated.
+    OPEN = 1;         // Connection is active.
+  };
+
+  required string remote_ip = 1;
+  required StateType state = 2;
+  // TODO: swap out for separate fields
+  optional string remote_user_credentials = 3;
+  repeated RpcCallInProgressPB calls_in_flight = 4;
+  optional int64 outbound_queue_size = 5;
+}
+
+message DumpRunningRpcsRequestPB {
+  optional bool include_traces = 1 [ default = false ];
+}
+
+message DumpRunningRpcsResponsePB {
+  repeated RpcConnectionPB inbound_connections = 1;
+  repeated RpcConnectionPB outbound_connections = 2;
+}
+
+//------------------------------------------------------------
+
+// A particular TraceMetric key/value pair from a sampled RPC.
+message TraceMetricPB {
+  // A '.'-separated path through the parent-child trace hierarchy.
+  optional string child_path = 1;
+  optional string key = 2;
+  optional int64 value = 3;
+}
+
+// A single sampled RPC call.
+message RpczSamplePB {
+  // The original request header.
+  optional RequestHeader header = 1;
+  // The stringified request trace.
+  optional string trace = 2;
+  // The number of millis that this call took to complete.
+  optional int32 duration_ms = 3;
+  // The metrics from the sampled trace.
+  repeated TraceMetricPB metrics = 4;
+}
+
+// A set of samples for a particular RPC method.
+message RpczMethodPB {
+  required string method_name = 1;
+  repeated RpczSamplePB samples = 2;
+}
+
+// Request and response for dumping previously sampled RPC calls.
+message DumpRpczStoreRequestPB {
+}
+message DumpRpczStoreResponsePB {
+  repeated RpczMethodPB methods = 1;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_service.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_service.h b/be/src/kudu/rpc/rpc_service.h
new file mode 100644
index 0000000..dcaa9c1
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_service.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_SERVICE_H_
+#define KUDU_RPC_SERVICE_H_
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+class RemoteMethod;
+struct RpcMethodInfo;
+class InboundCall;
+
+class RpcService : public RefCountedThreadSafe<RpcService> {
+ public:
+  virtual ~RpcService() {}
+
+  // Enqueue a call for processing.
+  // On failure, the RpcService::QueueInboundCall() implementation is
+  // responsible for responding to the client with a failure message.
+  virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) = 0;
+
+  virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) {
+    return nullptr;
+  }
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_SERVICE_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_sidecar.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_sidecar.cc b/be/src/kudu/rpc/rpc_sidecar.cc
new file mode 100644
index 0000000..b4de678
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_sidecar.cc
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_sidecar.h"
+
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include <google/protobuf/repeated_field.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/status.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not
+// owned by this class, and it's the caller's responsibility to ensure it has a lifetime
+// at least as long as this sidecar.
+class SliceSidecar : public RpcSidecar {
+ public:
+  explicit SliceSidecar(Slice slice) : slice_(slice) { }
+  Slice AsSlice() const override { return slice_; }
+
+ private:
+  const Slice slice_;
+};
+
+class FaststringSidecar : public RpcSidecar {
+ public:
+  explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { }
+  Slice AsSlice() const override { return *data_; }
+
+ private:
+  const unique_ptr<faststring> data_;
+};
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) {
+  return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
+  return unique_ptr<RpcSidecar>(new SliceSidecar(slice));
+}
+
+
+Status RpcSidecar::ParseSidecars(
+    const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+    Slice buffer, Slice* sidecars) {
+  if (offsets.size() == 0) return Status::OK();
+
+  int last = offsets.size() - 1;
+  if (last >= TransferLimits::kMaxSidecars) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 additional payload slices, expected at most $1",
+            last, TransferLimits::kMaxSidecars));
+  }
+
+  if (buffer.size() > TransferLimits::kMaxTotalSidecarBytes) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 payload bytes, expected at most $1",
+            buffer.size(), TransferLimits::kMaxTotalSidecarBytes));
+  }
+
+  for (int i = 0; i < last; ++i) {
+    int64_t cur_offset = offsets.Get(i);
+    int64_t next_offset = offsets.Get(i + 1);
+    if (next_offset > buffer.size()) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " has length $2, but the entire message has length $3",
+              i, cur_offset, (next_offset - cur_offset), buffer.size()));
+    }
+    if (next_offset < cur_offset) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " but ends before that at offset $1.", i, cur_offset, next_offset));
+    }
+
+    sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
+  }
+
+  int64_t cur_offset = offsets.Get(last);
+  if (cur_offset > buffer.size()) {
+    return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 "
+            "starts at offset $1after message ends (message length $2).", last,
+            cur_offset, buffer.size()));
+  }
+  sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
+
+  return Status::OK();
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_sidecar.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_sidecar.h b/be/src/kudu/rpc/rpc_sidecar.h
new file mode 100644
index 0000000..bfbfcea
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_sidecar.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_SIDECAR_H
+#define KUDU_RPC_RPC_SIDECAR_H
+
+#include <memory>
+
+#include <google/protobuf/repeated_field.h> // IWYU pragma: keep
+#include <google/protobuf/stubs/port.h>
+
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+class Status;
+class faststring;
+
+namespace rpc {
+
+// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data
+// without extra copies. In other words, whenever a protobuf would have a large field
+// where additional copies become expensive, one may opt instead to use an RpcSidecar.
+//
+// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and
+// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the
+// client and server respectively. They are ignorant of the sidecar's format, requiring
+// only that it can be represented as a Slice. Data is copied from the Slice returned from
+// AsSlice() to the socket that is responding to the original RPC. The slice should remain
+// valid for as long as the call it is attached to takes to complete.
+//
+// In order to distinguish between separate sidecars, whenever a sidecar is added to the
+// RPC response on the server side, an index for that sidecar is returned. This index must
+// then in some way (i.e., via protobuf) be communicated to the recipient.
+//
+// After reconstructing the array of sidecars, servers and clients may retrieve the
+// sidecar data through the RpcContext or RpcController interfaces respectively.
+class RpcSidecar {
+ public:
+  static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data);
+  static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
+
+  // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and
+  // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and
+  // will be filled from index 0.
+  // TODO(henryr): Consider a vector instead here if there's no perf. impact.
+  static Status ParseSidecars(
+      const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+      Slice buffer, Slice* sidecars);
+
+  // Returns a Slice representation of the sidecar's data.
+  virtual Slice AsSlice() const = 0;
+  virtual ~RpcSidecar() { }
+};
+
+} // namespace rpc
+} // namespace kudu
+
+
+#endif /* KUDU_RPC_RPC_SIDECAR_H */