You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:50 UTC
[38/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc.h b/be/src/kudu/rpc/rpc.h
new file mode 100644
index 0000000..bd195dc
--- /dev/null
+++ b/be/src/kudu/rpc/rpc.h
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_H
+#define KUDU_RPC_RPC_H
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace rpc {
+
+class Messenger;
+class Rpc;
+
+// Result status of a retriable Rpc.
+//
+// TODO Consider merging this with ScanRpcStatus.
+struct RetriableRpcStatus {
+ enum Result {
+ // There was no error, i.e. the Rpc was successful.
+ OK,
+
+ // The Rpc got an error and it's not retriable.
+ NON_RETRIABLE_ERROR,
+
+ // The server couldn't be reached, i.e. there was a network error while
+ // reaching the replica or a DNS resolution problem.
+ SERVER_NOT_ACCESSIBLE,
+
+ // The server received the request but it was not ready to serve it right
+ // away. It might happen that the server was too busy and did not have
+ // necessary resources or information to serve the request but it
+ // anticipates it should be ready to serve the request really soon, so it's
+ // worth retrying the request at a later time.
+ SERVICE_UNAVAILABLE,
+
+ // For rpc's that are meant only for the leader of a shared resource, when the server
+ // we're interacting with is not the leader.
+ REPLICA_NOT_LEADER,
+
+ // The server doesn't know the resource we're interacting with. For instance a TabletServer
+ // is not part of the config for the tablet we're trying to write to.
+ RESOURCE_NOT_FOUND,
+
+ // The authentication token supplied with the operation was found invalid
+ // by the server. Most likely, the token has expired. If so, get a new token
+ // using client credentials and retry the operation with it.
+ INVALID_AUTHENTICATION_TOKEN,
+ };
+
+ Result result;
+ Status status;
+};
+
+// This class picks a server among a possible set of servers serving a given resource.
+//
+// TODO Currently this only picks the leader, though it wouldn't be unfeasible to have this
+// have an enum so that it can pick any server.
+template <class Server>
+class ServerPicker : public RefCountedThreadSafe<ServerPicker<Server>> {
+ public:
+ virtual ~ServerPicker() {}
+
+ typedef Callback<void(const Status& status, Server* server)> ServerPickedCallback;
+
+ // Picks the leader among the replicas serving a resource.
+ // If the leader was found, it calls the callback with Status::OK() and
+ // with 'server' set to the current leader, otherwise calls the callback
+ // with 'status' set to the failure reason, and 'server' set to nullptr.
+ // If picking a leader takes longer than 'deadline' the callback is called with
+ // Status::TimedOut().
+ virtual void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) = 0;
+
+ // Marks a server as failed/unacessible.
+ virtual void MarkServerFailed(Server *server, const Status &status) = 0;
+
+ // Marks a server as not the leader of config serving the resource we're trying to interact with.
+ virtual void MarkReplicaNotLeader(Server* replica) = 0;
+
+ // Marks a server as not serving the resource we want.
+ virtual void MarkResourceNotFound(Server *replica) = 0;
+};
+
+// Provides utilities for retrying failed RPCs.
+//
+// All RPCs should use HandleResponse() to retry certain generic errors.
+class RpcRetrier {
+ public:
+ RpcRetrier(MonoTime deadline, std::shared_ptr<rpc::Messenger> messenger)
+ : attempt_num_(1),
+ deadline_(deadline),
+ messenger_(std::move(messenger)) {
+ if (deadline_.Initialized()) {
+ controller_.set_deadline(deadline_);
+ }
+ controller_.Reset();
+ }
+
+ // Tries to handle a failed RPC.
+ //
+ // If it was handled (e.g. scheduled for retry in the future), returns
+ // true. In this case, callers should ensure that 'rpc' remains alive.
+ //
+ // Otherwise, returns false and writes the controller status to
+ // 'out_status'.
+ bool HandleResponse(Rpc* rpc, Status* out_status);
+
+ // Retries an RPC at some point in the near future. If 'why_status' is not OK,
+ // records it as the most recent error causing the RPC to retry. This is
+ // reported to the caller eventually if the RPC never succeeds.
+ //
+ // If the RPC's deadline expires, the callback will fire with a timeout
+ // error when the RPC comes up for retrying. This is true even if the
+ // deadline has already expired at the time that Retry() was called.
+ //
+ // Callers should ensure that 'rpc' remains alive.
+ void DelayedRetry(Rpc* rpc, const Status& why_status);
+
+ RpcController* mutable_controller() { return &controller_; }
+ const RpcController& controller() const { return controller_; }
+
+ const MonoTime& deadline() const { return deadline_; }
+
+ const std::shared_ptr<Messenger>& messenger() const {
+ return messenger_;
+ }
+
+ int attempt_num() const { return attempt_num_; }
+
+ // Called when an RPC comes up for retrying. Actually sends the RPC.
+ void DelayedRetryCb(Rpc* rpc, const Status& status);
+
+ private:
+ // The next sent rpc will be the nth attempt (indexed from 1).
+ int attempt_num_;
+
+ // If the remote end is busy, the RPC will be retried (with a small
+ // delay) until this deadline is reached.
+ //
+ // May be uninitialized.
+ MonoTime deadline_;
+
+ // Messenger to use when sending the RPC.
+ std::shared_ptr<Messenger> messenger_;
+
+ // RPC controller to use when sending the RPC.
+ RpcController controller_;
+
+ // In case any retries have already happened, remembers the last error.
+ // Errors from the server take precedence over timeout errors.
+ Status last_error_;
+
+ DISALLOW_COPY_AND_ASSIGN(RpcRetrier);
+};
+
+// An in-flight remote procedure call to some server.
+class Rpc {
+ public:
+ Rpc(const MonoTime& deadline,
+ std::shared_ptr<rpc::Messenger> messenger)
+ : retrier_(deadline, std::move(messenger)) {
+ }
+
+ virtual ~Rpc() {}
+
+ // Asynchronously sends the RPC to the remote end.
+ //
+ // Subclasses should use SendRpcCb() below as the callback function.
+ virtual void SendRpc() = 0;
+
+ // Returns a string representation of the RPC.
+ virtual std::string ToString() const = 0;
+
+ // Returns the number of times this RPC has been sent. Will always be at
+ // least one.
+ int num_attempts() const { return retrier().attempt_num(); }
+
+ protected:
+ const RpcRetrier& retrier() const { return retrier_; }
+ RpcRetrier* mutable_retrier() { return &retrier_; }
+
+ private:
+ friend class RpcRetrier;
+
+ // Callback for SendRpc(). If 'status' is not OK, something failed
+ // before the RPC was sent.
+ virtual void SendRpcCb(const Status& status) = 0;
+
+ // Used to retry some failed RPCs.
+ RpcRetrier retrier_;
+
+ DISALLOW_COPY_AND_ASSIGN(Rpc);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_RPC_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.cc b/be/src/kudu/rpc/rpc_context.cc
new file mode 100644
index 0000000..97da445
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_context.cc
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_context.h"
+
+#include <memory>
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+#include <google/protobuf/message.h>
+
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/trace.h"
+
+using google::protobuf::Message;
+using kudu::pb_util::SecureDebugString;
+using std::string;
+using std::unique_ptr;
+
+namespace kudu {
+
+class Slice;
+
+namespace rpc {
+
+RpcContext::RpcContext(InboundCall *call,
+ const google::protobuf::Message *request_pb,
+ google::protobuf::Message *response_pb,
+ scoped_refptr<ResultTracker> result_tracker)
+ : call_(CHECK_NOTNULL(call)),
+ request_pb_(request_pb),
+ response_pb_(response_pb),
+ result_tracker_(std::move(result_tracker)) {
+ VLOG(4) << call_->remote_method().service_name() << ": Received RPC request for "
+ << call_->ToString() << ":" << std::endl << SecureDebugString(*request_pb_);
+ TRACE_EVENT_ASYNC_BEGIN2("rpc_call", "RPC", this,
+ "call", call_->ToString(),
+ "request", pb_util::PbTracer::TracePb(*request_pb_));
+}
+
+RpcContext::~RpcContext() {
+}
+
+void RpcContext::RespondSuccess() {
+ if (AreResultsTracked()) {
+ result_tracker_->RecordCompletionAndRespond(call_->header().request_id(),
+ response_pb_.get());
+ } else {
+ VLOG(4) << call_->remote_method().service_name() << ": Sending RPC success response for "
+ << call_->ToString() << ":" << std::endl << SecureDebugString(*response_pb_);
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "response", pb_util::PbTracer::TracePb(*response_pb_),
+ "trace", trace()->DumpToString());
+ call_->RespondSuccess(*response_pb_);
+ delete this;
+ }
+}
+
+void RpcContext::RespondNoCache() {
+ if (AreResultsTracked()) {
+ result_tracker_->FailAndRespond(call_->header().request_id(),
+ response_pb_.get());
+ } else {
+ VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
+ << call_->ToString() << ": " << SecureDebugString(*response_pb_);
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "response", pb_util::PbTracer::TracePb(*response_pb_),
+ "trace", trace()->DumpToString());
+ // This is a bit counter intuitive, but when we get the failure but set the error on the
+ // call's response we call RespondSuccess() instead of RespondFailure().
+ call_->RespondSuccess(*response_pb_);
+ delete this;
+ }
+}
+
+void RpcContext::RespondFailure(const Status &status) {
+ return RespondRpcFailure(ErrorStatusPB::ERROR_APPLICATION, status);
+}
+
+void RpcContext::RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status) {
+ if (AreResultsTracked()) {
+ result_tracker_->FailAndRespond(call_->header().request_id(),
+ err, status);
+ } else {
+ VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
+ << call_->ToString() << ": " << status.ToString();
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "status", status.ToString(),
+ "trace", trace()->DumpToString());
+ call_->RespondFailure(err, status);
+ delete this;
+ }
+}
+
+void RpcContext::RespondApplicationError(int error_ext_id, const std::string& message,
+ const Message& app_error_pb) {
+ if (AreResultsTracked()) {
+ result_tracker_->FailAndRespond(call_->header().request_id(),
+ error_ext_id, message, app_error_pb);
+ } else {
+ if (VLOG_IS_ON(4)) {
+ ErrorStatusPB err;
+ InboundCall::ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
+ VLOG(4) << call_->remote_method().service_name()
+ << ": Sending application error response for " << call_->ToString()
+ << ":" << std::endl << SecureDebugString(err);
+ }
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "response", pb_util::PbTracer::TracePb(app_error_pb),
+ "trace", trace()->DumpToString());
+ call_->RespondApplicationError(error_ext_id, message, app_error_pb);
+ delete this;
+ }
+}
+
+const rpc::RequestIdPB* RpcContext::request_id() const {
+ return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
+}
+
+size_t RpcContext::GetTransferSize() const {
+ return call_->GetTransferSize();
+}
+
+Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+ return call_->AddOutboundSidecar(std::move(car), idx);
+}
+
+Status RpcContext::GetInboundSidecar(int idx, Slice* slice) {
+ return call_->GetInboundSidecar(idx, slice);
+}
+
+const RemoteUser& RpcContext::remote_user() const {
+ return call_->remote_user();
+}
+
+bool RpcContext::is_confidential() const {
+ return call_->connection()->is_confidential();
+}
+
+void RpcContext::DiscardTransfer() {
+ call_->DiscardTransfer();
+}
+
+const Sockaddr& RpcContext::remote_address() const {
+ return call_->remote_address();
+}
+
+std::string RpcContext::requestor_string() const {
+ return call_->remote_user().ToString() + " at " +
+ call_->remote_address().ToString();
+}
+
+std::string RpcContext::method_name() const {
+ return call_->remote_method().method_name();
+}
+
+std::string RpcContext::service_name() const {
+ return call_->remote_method().service_name();
+}
+
+MonoTime RpcContext::GetClientDeadline() const {
+ return call_->GetClientDeadline();
+}
+
+MonoTime RpcContext::GetTimeReceived() const {
+ return call_->GetTimeReceived();
+}
+
+Trace* RpcContext::trace() {
+ return call_->trace();
+}
+
+void RpcContext::Panic(const char* filepath, int line_number, const string& message) {
+ // Use the LogMessage class directly so that the log messages appear to come from
+ // the line of code which caused the panic, not this code.
+#define MY_ERROR google::LogMessage(filepath, line_number, google::GLOG_ERROR).stream()
+#define MY_FATAL google::LogMessageFatal(filepath, line_number).stream()
+
+ MY_ERROR << "Panic handling " << call_->ToString() << ": " << message;
+ MY_ERROR << "Request:\n" << SecureDebugString(*request_pb_);
+ Trace* t = trace();
+ if (t) {
+ MY_ERROR << "RPC trace:";
+ t->Dump(&MY_ERROR, true);
+ }
+ MY_FATAL << "Exiting due to panic.";
+
+#undef MY_ERROR
+#undef MY_FATAL
+}
+
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.h b/be/src/kudu/rpc/rpc_context.h
new file mode 100644
index 0000000..c729d5e
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_context.h
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_CONTEXT_H
+#define KUDU_RPC_RPC_CONTEXT_H
+
+#include <memory>
+#include <stddef.h>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Slice;
+class Sockaddr;
+class Trace;
+
+namespace rpc {
+
+class InboundCall;
+class RemoteUser;
+class ResultTracker;
+class RpcSidecar;
+
+#define PANIC_RPC(rpc_context, message) \
+ do { \
+ if (rpc_context) { \
+ rpc_context->Panic(__FILE__, __LINE__, (message)); \
+ } else { \
+ LOG(FATAL) << message; \
+ } \
+ } while (0)
+
+// The context provided to a generated ServiceIf. This provides
+// methods to respond to the RPC. In the future, this will also
+// include methods to access information about the caller: e.g
+// authentication info, tracing info, and cancellation status.
+//
+// This is the server-side analogue to the RpcController class.
+class RpcContext {
+ public:
+ // Create an RpcContext. This is called only from generated code
+ // and is not a public API.
+ RpcContext(InboundCall *call,
+ const google::protobuf::Message *request_pb,
+ google::protobuf::Message *response_pb,
+ scoped_refptr<ResultTracker> result_tracker);
+
+ ~RpcContext();
+
+ // Return the trace buffer for this call.
+ Trace* trace();
+
+ // Send a response to the call. The service may call this method
+ // before or after returning from the original handler method,
+ // and it may call this method from a different thread.
+ //
+ // The response should be prepared already in the response PB pointer
+ // which was passed to the handler method.
+ //
+ // After this method returns, this RpcContext object is destroyed. The request
+ // and response protobufs are also destroyed.
+ void RespondSuccess();
+
+ // Like the above, but doesn't store the results of the service call, if results
+ // are being tracked.
+ // Used in cases where a call specific error was set on the response protobuf,
+ // the call should be considered failed, thus results shouldn't be cached.
+ void RespondNoCache();
+
+ // Respond with an error to the client. This sends back an error with the code
+ // ERROR_APPLICATION. Because there is no more specific error code passed back
+ // to the client, most applications should create a custom error PB extension
+ // and use RespondApplicationError(...) below. This method should only be used
+ // for unexpected errors where the server doesn't expect the client to do any
+ // more advanced handling.
+ //
+ // After this method returns, this RpcContext object is destroyed. The request
+ // and response protobufs are also destroyed.
+ void RespondFailure(const Status &status);
+
+ // Respond with an RPC-level error. This typically manifests to the client as
+ // a remote error, one whose handling is agnostic to the particulars of the
+ // sent RPC. For example, both ERROR_SERVER_TOO_BUSY and ERROR_UNAVAILABLE
+ // usually cause the client to retry the RPC at a later time.
+ //
+ // After this method returns, this RpcContext object is destroyed. The request
+ // and response protobufs are also destroyed.
+ void RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status);
+
+ // Respond with an application-level error. This causes the caller to get a
+ // RemoteError status with the provided string message. Additionally, a
+ // service-specific error extension is passed back to the client. The
+ // extension must be registered with the ErrorStatusPB protobuf. For
+ // example:
+ //
+ // message MyServiceError {
+ // extend kudu.rpc.ErrorStatusPB {
+ // optional MyServiceError my_service_error_ext = 101;
+ // }
+ // // Add any extra fields or status codes you want to pass back to
+ // // the client here.
+ // required string extra_error_data = 1;
+ // }
+ //
+ // NOTE: the numeric '101' above must be an integer greater than 101
+ // and must be unique across your code base.
+ //
+ // Given the above definition in your service protobuf file, you would
+ // use this method like:
+ //
+ // MyServiceError err;
+ // err.set_extra_error_data("foo bar");
+ // ctx->RespondApplicationError(MyServiceError::my_service_error_ext.number(),
+ // "Some error occurred", err);
+ //
+ // The client side may then retreieve the error by calling:
+ // const MyServiceError& err_details =
+ // controller->error_response()->GetExtension(MyServiceError::my_service_error_ext);
+ //
+ // After this method returns, this RpcContext object is destroyed. The request
+ // and response protobufs are also destroyed.
+ void RespondApplicationError(int error_ext_id, const std::string& message,
+ const google::protobuf::Message& app_error_pb);
+
+
+ // Adds an RpcSidecar to the response. This is the preferred method for
+ // transferring large amounts of binary data, because this avoids additional
+ // copies made by serializing the protobuf.
+ //
+ // Assumes no changes to the sidecar's data are made after insertion.
+ //
+ // Upon success, writes the index of the sidecar (necessary to be retrieved
+ // later) to 'idx'. Call may fail if all sidecars have already been used
+ // by the RPC response.
+ Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+ // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 'idx' is out
+ // of bounds.
+ Status GetInboundSidecar(int idx, Slice* slice);
+
+ // Return the identity of remote user who made this call.
+ const RemoteUser& remote_user() const;
+
+ // Whether it's OK to pass confidential information between the client and the
+ // server in the context of the RPC call being handled. In real world, this
+ // translates into properties of the connection between the client and the
+ // server. For example, this methods returns 'true' for a call over an
+ // encrypted connection.
+ bool is_confidential() const;
+
+ // Discards the memory associated with the inbound call's payload. All previously
+ // obtained sidecar slices will be invalidated by this call. It is an error to call
+ // GetInboundSidecar() after this method. request_pb() remains valid.
+ // This is useful in the case where the server wishes to delay responding to an RPC
+ // (perhaps to control the rate of RPC requests), but knows that the RPC payload itself
+ // won't be processed any further.
+ void DiscardTransfer();
+
+ // Return the remote IP address and port which sent the current RPC call.
+ const Sockaddr& remote_address() const;
+
+ // A string identifying the requestor -- both the user info and the IP address.
+ // Suitable for use in log messages.
+ std::string requestor_string() const;
+
+ // Return the name of the RPC service method being called.
+ std::string method_name() const;
+
+ // Return the name of the RPC service being called.
+ std::string service_name() const;
+
+ const google::protobuf::Message *request_pb() const { return request_pb_.get(); }
+ google::protobuf::Message *response_pb() const { return response_pb_.get(); }
+
+ // Return an upper bound on the client timeout deadline. This does not
+ // account for transmission delays between the client and the server.
+ // If the client did not specify a deadline, returns MonoTime::Max().
+ MonoTime GetClientDeadline() const;
+
+ // Return the time when the inbound call was received.
+ MonoTime GetTimeReceived() const;
+
+ // Whether the results of this RPC are tracked with a ResultTracker.
+ // If this returns true, both result_tracker() and request_id() should return non-null results.
+ bool AreResultsTracked() const { return result_tracker_.get() != nullptr; }
+
+ // Returns this call's result tracker, if it is set.
+ const scoped_refptr<ResultTracker>& result_tracker() const {
+ return result_tracker_;
+ }
+
+ // Returns this call's request id, if it is set.
+ const rpc::RequestIdPB* request_id() const;
+
+ // Returns the size of the transfer buffer that backs 'call_'. If the
+ // transfer buffer no longer exists (e.g. GetTransferSize() is called after
+ // DiscardTransfer()), returns 0.
+ size_t GetTransferSize() const;
+
+ // Panic the server. This logs a fatal error with the given message, and
+ // also includes the current RPC request, requestor, trace information, etc,
+ // to make it easier to debug.
+ //
+ // Call this via the PANIC_RPC() macro.
+ void Panic(const char* filepath, int line_number, const std::string& message)
+ __attribute__((noreturn));
+
+ private:
+ friend class ResultTracker;
+ InboundCall* const call_;
+ const gscoped_ptr<const google::protobuf::Message> request_pb_;
+ const gscoped_ptr<google::protobuf::Message> response_pb_;
+ scoped_refptr<ResultTracker> result_tracker_;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.cc b/be/src/kudu/rpc/rpc_controller.cc
new file mode 100644
index 0000000..77c7ca4
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_controller.cc
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_controller.h"
+
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/slice.h"
+
+
+using std::unique_ptr;
+using strings::Substitute;
+namespace kudu {
+
+namespace rpc {
+
+RpcController::RpcController()
+ : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS), messenger_(nullptr) {
+ DVLOG(4) << "RpcController " << this << " constructed";
+}
+
+RpcController::~RpcController() {
+ DVLOG(4) << "RpcController " << this << " destroyed";
+}
+
+void RpcController::Swap(RpcController* other) {
+ // Cannot swap RPC controllers while they are in-flight.
+ if (call_) {
+ CHECK(finished());
+ }
+ if (other->call_) {
+ CHECK(other->finished());
+ }
+
+ std::swap(outbound_sidecars_, other->outbound_sidecars_);
+ std::swap(outbound_sidecars_total_bytes_, other->outbound_sidecars_total_bytes_);
+ std::swap(timeout_, other->timeout_);
+ std::swap(credentials_policy_, other->credentials_policy_);
+ std::swap(call_, other->call_);
+}
+
+void RpcController::Reset() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ if (call_) {
+ CHECK(finished());
+ }
+ call_.reset();
+ required_server_features_.clear();
+ credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS;
+ messenger_ = nullptr;
+ outbound_sidecars_total_bytes_ = 0;
+}
+
+bool RpcController::finished() const {
+ if (call_) {
+ return call_->IsFinished();
+ }
+ return false;
+}
+
+bool RpcController::negotiation_failed() const {
+ if (call_) {
+ DCHECK(finished());
+ return call_->IsNegotiationError();
+ }
+ return false;
+}
+
+Status RpcController::status() const {
+ if (call_) {
+ return call_->status();
+ }
+ return Status::OK();
+}
+
+const ErrorStatusPB* RpcController::error_response() const {
+ if (call_) {
+ return call_->error_pb();
+ }
+ return nullptr;
+}
+
+Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const {
+ return call_->call_response_->GetSidecar(idx, sidecar);
+}
+
+void RpcController::set_timeout(const MonoDelta& timeout) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ DCHECK(!call_ || call_->state() == OutboundCall::READY);
+ timeout_ = timeout;
+}
+
+void RpcController::set_deadline(const MonoTime& deadline) {
+ set_timeout(deadline - MonoTime::Now());
+}
+
+void RpcController::SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id) {
+ request_id_ = std::move(request_id);
+}
+
+bool RpcController::has_request_id() const {
+ return request_id_ != nullptr;
+}
+
+const RequestIdPB& RpcController::request_id() const {
+ DCHECK(has_request_id());
+ return *request_id_;
+}
+
+void RpcController::RequireServerFeature(uint32_t feature) {
+ DCHECK(!call_ || call_->state() == OutboundCall::READY);
+ required_server_features_.insert(feature);
+}
+
+MonoDelta RpcController::timeout() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return timeout_;
+}
+
+Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+ if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
+ return Status::RuntimeError("All available sidecars already used");
+ }
+ int64_t sidecar_bytes = car->AsSlice().size();
+ if (outbound_sidecars_total_bytes_ >
+ TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
+ return Status::RuntimeError(Substitute("Total size of sidecars $0 would exceed limit $1",
+ static_cast<int64_t>(outbound_sidecars_total_bytes_) + sidecar_bytes,
+ TransferLimits::kMaxTotalSidecarBytes));
+ }
+
+ outbound_sidecars_.emplace_back(std::move(car));
+ outbound_sidecars_total_bytes_ += sidecar_bytes;
+ DCHECK_GE(outbound_sidecars_total_bytes_, 0);
+ *idx = outbound_sidecars_.size() - 1;
+ return Status::OK();
+}
+
+void RpcController::SetRequestParam(const google::protobuf::Message& req) {
+ DCHECK(call_ != nullptr);
+ call_->SetRequestPayload(req, std::move(outbound_sidecars_));
+}
+
+void RpcController::Cancel() {
+ DCHECK(call_);
+ DCHECK(messenger_);
+ messenger_->QueueCancellation(call_);
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.h b/be/src/kudu/rpc/rpc_controller.h
new file mode 100644
index 0000000..aa61d83
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_controller.h
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_CONTROLLER_H
+#define KUDU_RPC_RPC_CONTROLLER_H
+
+#include <cstdint>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Slice;
+
+namespace rpc {
+
+class ErrorStatusPB;
+class Messenger;
+class OutboundCall;
+class RequestIdPB;
+class RpcSidecar;
+
+// Authentication credentials policy for outbound RPCs. Some RPC methods
+// (e.g. MasterService::ConnectToMaster) behave differently depending on the
+// type of credentials used for authentication when establishing the connection.
+// The client expecting some particular results from the call should specify
+// the required policy on a per-call basis using RpcController. By default,
+// RpcController uses ANY_CREDENTIALS.
+enum class CredentialsPolicy {
+ // It's acceptable to use authentication credentials of any type, primary or
+ // secondary ones.
+ ANY_CREDENTIALS,
+
+ // Only primary credentials are acceptable. Primary credentials are Kerberos
+ // tickets, TLS certificate. Secondary credentials are authentication tokens:
+ // they are 'derived' in the sense that it's possible to acquire them using
+ // 'primary' credentials.
+ PRIMARY_CREDENTIALS,
+};
+
+// Controller for managing properties of a single RPC call, on the client side.
+//
+// An RpcController maps to exactly one call and is not thread-safe. The client
+// may use this class prior to sending an RPC in order to set properties such
+// as the call's timeout.
+//
+// After the call has been sent (e.g using Proxy::AsyncRequest()) the user
+// may invoke methods on the RpcController object in order to probe the status
+// of the call.
+class RpcController {
+ public:
+ RpcController();
+ ~RpcController();
+
+ // Swap the state of the controller (including ownership of sidecars, buffers,
+ // etc) with another one.
+ void Swap(RpcController* other);
+
+ // Reset this controller so it may be used with another call.
+ // Note that this resets the required server features.
+ void Reset();
+
+ // Return true if the call has finished.
+ // A call is finished if the server has responded, or if the call
+ // has timed out.
+ bool finished() const;
+
+ // Whether the call failed due to connection negotiation error.
+ bool negotiation_failed() const;
+
+ // Return the current status of a call.
+ //
+ // A call is "OK" status until it finishes, at which point it may
+ // either remain in "OK" status (if the call was successful), or
+ // change to an error status. Error status indicates that there was
+ // some RPC-layer issue with making the call, for example, one of:
+ //
+ // * failed to establish a connection to the server
+ // * the server was too busy to handle the request
+ // * the server was unable to interpret the request (eg due to a version
+ // mismatch)
+ // * a network error occurred which caused the connection to be torn
+ // down
+ // * the call timed out
+ Status status() const;
+
+ // If status() returns a RemoteError object, then this function returns
+ // the error response provided by the server. Service implementors may
+ // use protobuf Extensions to add application-specific data to this PB.
+ //
+ // If Status was not a RemoteError, this returns NULL.
+ // The returned pointer is only valid as long as the controller object.
+ const ErrorStatusPB* error_response() const;
+
+ // Set the timeout for the call to be made with this RPC controller.
+ //
+ // The configured timeout applies to the entire time period between
+ // the AsyncRequest() method call and getting a response. For example,
+ // if it takes too long to establish a connection to the remote host,
+ // or to DNS-resolve the remote host, those will be accounted as part
+ // of the timeout period.
+ //
+ // Timeouts must be set prior to making the request -- the timeout may
+ // not currently be adjusted for an already-sent call.
+ //
+ // Using an uninitialized timeout will result in a call which never
+ // times out (not recommended!)
+ void set_timeout(const MonoDelta& timeout);
+
+ // Like a timeout, but based on a fixed point in time instead of a delta.
+ //
+ // Using an uninitialized deadline means the call won't time out.
+ void set_deadline(const MonoTime& deadline);
+
+ // Allows setting the request id for the next request sent to the server.
+ // A request id allows the server to identify each request sent by the client uniquely,
+ // in some cases even when sent to multiple servers, enabling exactly once semantics.
+ void SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id);
+
+ // Returns whether a request id has been set on RPC header.
+ bool has_request_id() const;
+
+ // Returns the currently set request id.
+ // When the request is sent to the server, it gets "moved" from RpcController
+ // so an absence of a request after send doesn't mean one wasn't sent.
+ // REQUIRES: the controller has a request ID set.
+ const RequestIdPB& request_id() const;
+
+ // Add a requirement that the server side must support a feature with the
+ // given identifier. The set of required features is sent to the server
+ // with the RPC call, and if any required feature is not supported, the
+ // call will fail with a NotSupported() status.
+ //
+ // This can be used when an RPC call changes in a way that is protobuf-compatible,
+ // but for which it would not be appropriate for the server to simply ignore
+ // an added field. For example, consider an API call like:
+ //
+ // message DeleteAccount {
+ // optional string username = 1;
+ // optional bool dry_run = 2; // ADDED LATER!
+ // }
+ //
+ // In this case, if a new client which supports the 'dry_run' flag sends the RPC
+ // to an old server, the old server will simply ignore the unrecognized parameter,
+ // with highly problematic results. To solve this problem, the new version can
+ // add a feature flag:
+ //
+ // In .proto file
+ // ----------------
+ // enum MyFeatureFlags {
+ // UNKNOWN = 0;
+ // DELETE_ACCOUNT_SUPPORTS_DRY_RUN = 1;
+ // }
+ //
+ // In client code:
+ // ---------------
+ // if (dry_run) {
+ // rpc.RequireServerFeature(DELETE_ACCOUNT_SUPPORTS_DRY_RUN);
+ // req.set_dry_run(true);
+ // }
+ //
+ // This has the effect of (a) maintaining compatibility when dry_run is not specified
+ // and (b) rejecting the RPC with a "NotSupported" error when it is.
+ //
+ // NOTE: 'feature' is an int rather than an enum type because each service
+ // must define its own enum of supported features, and protobuf doesn't support
+ // any ability to 'extend' enum types. Implementers should define an enum in the
+ // service's protobuf definition as shown above.
+ void RequireServerFeature(uint32_t feature);
+
+ // Executes the provided function with a reference to the required server
+ // features.
+ const std::unordered_set<uint32_t>& required_server_features() const {
+ return required_server_features_;
+ }
+
+ // Return the configured timeout.
+ MonoDelta timeout() const;
+
+ CredentialsPolicy credentials_policy() const {
+ return credentials_policy_;
+ }
+
+ void set_credentials_policy(CredentialsPolicy policy) {
+ credentials_policy_ = policy;
+ }
+
+ // Fills the 'sidecar' parameter with the slice pointing to the i-th
+ // sidecar upon success.
+ //
+ // Should only be called if the call's finished, but the controller has not
+ // been Reset().
+ //
+ // May fail if index is invalid.
+ Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+ // Adds a sidecar to the outbound request. The index of the sidecar is written to
+ // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
+ // to this request. Also returns an error if the total size of all sidecars would
+ // exceed TransferLimits::kMaxTotalSidecarBytes.
+ Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+ // Cancel the call associated with the RpcController. This function should only be
+ // called when there is an outstanding outbound call. It's always safe to call
+ // Cancel() after you've sent a call, so long as you haven't called Reset() yet.
+ // Caller is not responsible for synchronization between cancellation and the
+ // callback. (i.e. the callback may or may not be invoked yet when Cancel()
+ // is called).
+ //
+ // Cancellation is "best effort" - i.e. it's still possible the callback passed
+ // to the call will be fired with a success status. If cancellation succeeds,
+ // the callback will be invoked with a Aborted status. Cancellation is asynchronous
+ // so the callback will still be invoked from the reactor thread.
+ void Cancel();
+
+ private:
+ friend class OutboundCall;
+ friend class Proxy;
+
+ // Set the outbound call_'s request parameter, and transfer ownership of
+ // outbound_sidecars_ to call_ in preparation for serialization.
+ void SetRequestParam(const google::protobuf::Message& req);
+
+ // Set the messenger which contains the reactor thread handling the outbound call.
+ void SetMessenger(Messenger* messenger) { messenger_ = messenger; }
+
+ MonoDelta timeout_;
+ std::unordered_set<uint32_t> required_server_features_;
+
+ // RPC authentication policy for outbound calls.
+ CredentialsPolicy credentials_policy_;
+
+ mutable simple_spinlock lock_;
+
+ // The id of this request.
+ // Ownership is transferred to OutboundCall once the call is sent.
+ std::unique_ptr<RequestIdPB> request_id_;
+
+ // The messenger which contains the reactor thread for 'call_'.
+ // Set only when 'call_' is set.
+ Messenger* messenger_;
+
+ // Once the call is sent, it is tracked here.
+ std::shared_ptr<OutboundCall> call_;
+
+ std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+ // Total size of sidecars in outbound_sidecars_. This is limited to a maximum
+ // of TransferLimits::kMaxTotalSidecarBytes.
+ int32_t outbound_sidecars_total_bytes_ = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(RpcController);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_header.proto b/be/src/kudu/rpc/rpc_header.proto
new file mode 100644
index 0000000..1d55b6a
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_header.proto
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+
+option optimize_for = SPEED;
+
+package kudu.rpc;
+
+option java_package = "org.apache.kudu.rpc";
+
+import "google/protobuf/descriptor.proto";
+import "kudu/security/token.proto";
+import "kudu/util/pb_util.proto";
+
+// The Kudu RPC protocol is similar to the RPC protocol of Hadoop and HBase.
+// See the following for reference on those other protocols:
+// - https://issues.apache.org/jira/browse/HBASE-7898
+// - https://issues.apache.org/jira/browse/HADOOP-8990
+//
+// For a description of the Kudu protocol, see 'README' in this directory.
+
+// User Information proto. Included in ConnectionContextPB on connection setup.
+message UserInformationPB {
+ optional string effective_user = 1;
+ required string real_user = 2;
+}
+
+// The connection context is sent as part of the connection establishment.
+// It establishes the context for ALL RPC calls within the connection.
+// This is sent on connection setup after the connection preamble is sent
+// and SASL has been negotiated.
+// No response is sent from the server to the client.
+message ConnectionContextPB {
+ // UserInfo beyond what is determined as part of security handshake
+ // at connection time (kerberos, tokens etc).
+ //
+ // DEPRECATED: No longer used in Kudu 1.1 and later.
+ // The 'real_user' should be taken from the SASL negotiation.
+ // Impersonation (effective user) was never supported, so we'll have
+ // to add that back at some point later.
+ optional UserInformationPB DEPRECATED_user_info = 2;
+
+ // If the server sends a nonce to the client during the SASL_SUCCESS
+ // negotiation step, the client is required to encode it with SASL integrity
+ // protection and return it in this field. The nonce protects the server
+ // against a Kerberos replay attack.
+ optional bytes encoded_nonce = 3 [(REDACT) = true];
+}
+
+// Features supported by the RPC system itself.
+//
+// Note that this should be used to evolve the RPC _system_, not the semantics
+// or compatibility of individual calls.
+//
+// For example, if we were to add a feature like call or response wire
+// compression in the future, we could add a flag here to indicate that the
+// client or server supports that feature. Optional features which may safely be
+// ignored by the receiver do not need a feature flag, instead the optional
+// field feature of ProtoBuf may be utilized.
+enum RpcFeatureFlag {
+ UNKNOWN = 0;
+
+ // The RPC system is required to support application feature flags in the
+ // request and response headers.
+ APPLICATION_FEATURE_FLAGS = 1;
+
+ // The RPC system supports TLS protected connections. If both sides support
+ // this flag, the connection will automatically be wrapped in a TLS protected
+ // channel following a TLS handshake.
+ TLS = 2;
+
+ // If both sides advertise TLS_AUTHENTICATION_ONLY, this means that they
+ // agree that, after handshaking TLS, they will *not* wrap the connection
+ // in a TLS-protected channel. Instead, they will use TLS only for its
+ // handshake-based authentication.
+ //
+ // This is currently used for loopback connections only, so that compute
+ // frameworks which schedule for locality don't pay encryption overhead.
+ TLS_AUTHENTICATION_ONLY = 3;
+};
+
+// An authentication type. This is modeled as a oneof in case any of these
+// authentication types, or any authentication types in the future, need to add
+// extra type-specific parameters during negotiation.
+message AuthenticationTypePB {
+ message Sasl {};
+ message Token {};
+ message Certificate {};
+
+ oneof type {
+ // The server and client mutually authenticate via SASL.
+ Sasl sasl = 1;
+
+ // The server authenticates the client via a signed token, and the client
+ // authenticates the server by verifying its certificate has been signed by
+ // a trusted CA.
+ //
+ // Token authentication requires the connection to be TLS encrypted.
+ Token token = 2;
+
+ // The server and client mutually authenticate by certificate.
+ //
+ // Certificate authentication requires the connection to be TLS encrypted.
+ Certificate certificate = 3;
+ }
+}
+
+// Message type passed back & forth for the SASL negotiation.
+message NegotiatePB {
+ enum NegotiateStep {
+ UNKNOWN = 999;
+ NEGOTIATE = 1;
+ SASL_SUCCESS = 0;
+ SASL_INITIATE = 2;
+ SASL_CHALLENGE = 3;
+ SASL_RESPONSE = 4;
+ TLS_HANDSHAKE = 5;
+ TOKEN_EXCHANGE = 6;
+ }
+
+ message SaslMechanism {
+ // The SASL mechanism, i.e. 'PLAIN' or 'GSSAPI'.
+ required string mechanism = 2;
+
+ // Deprecated: no longer used.
+ // optional string method = 1;
+ // optional bytes challenge = 5 [(REDACT) = true];
+ }
+
+ // When the client sends its NEGOTIATE step message, it sends its set of
+ // supported RPC system features. In the response to this message, the server
+ // sends back its own. This allows the two peers to agree on whether newer
+ // extensions of the RPC system may be used on this connection. We use a list
+ // of features rather than a simple version number to make it easier for the
+ // Java and C++ clients to implement features in different orders while still
+ // maintaining compatibility, as well as to simplify backporting of features
+ // out-of-order.
+ repeated RpcFeatureFlag supported_features = 1;
+
+ // The current negotiation step.
+ required NegotiateStep step = 2;
+
+ // The SASL token, containing either the challenge during the SASL_CHALLENGE
+ // step, or the response during the SASL_RESPONSE step.
+ optional bytes token = 3 [(REDACT) = true];
+
+ // During the TLS_HANDSHAKE step, contains the TLS handshake message.
+ optional bytes tls_handshake = 5 [(REDACT) = true];
+
+ // The tls-server-end-point channel bindings as specified in RFC 5929. Sent
+ // from the server to the client during the SASL_SUCCESS step when the
+ // Kerberos (GSSAPI) SASL mechanism is used with TLS, in order to bind the
+ // Kerberos authenticated channel to the TLS channel. The value is integrity
+ // protected through SASL. The client is responsible for validating that the
+ // value matches the expected value.
+ optional bytes channel_bindings = 6 [(REDACT) = true];
+
+ // A random nonce sent from the server to the client during the SASL_SUCCESS
+ // step when the Kerberos (GSSAPI) SASL mechanism is used with TLS. The nonce
+ // must be sent back to the server, wrapped in SASL integrity protection, as
+ // part of the connection context.
+ optional bytes nonce = 9 [(REDACT) = true];
+
+ // During the NEGOTIATE step, contains the supported SASL mechanisms.
+ // During the SASL_INITIATE step, contains the single chosen SASL mechanism.
+ repeated SaslMechanism sasl_mechanisms = 4;
+
+ // During the client to server NEGOTIATE step, contains the supported authentication types.
+ // During the server to client NEGOTIATE step, contains the chosen authentication type.
+ repeated AuthenticationTypePB authn_types = 7;
+
+ // During the TOKEN_EXCHANGE step, contains the client's signed authentication token.
+ optional security.SignedTokenPB authn_token = 8;
+}
+
+message RemoteMethodPB {
+ // Service name for the RPC layer.
+ // The client created a proxy with this service name.
+ // Example: kudu.rpc_test.CalculatorService
+ required string service_name = 1;
+
+ // Name of the RPC method.
+ required string method_name = 2;
+};
+
+// The Id of a retriable RPC, whose results should be tracked on the server (see result_tracker.h).
+// This also includes some information that is useful for execution/garbage collection.
+message RequestIdPB {
+ // The (globally unique) id of the client performing this RPC.
+ required string client_id = 1;
+
+ // The (per-client unique) sequence number of this RPC.
+ required int64 seq_no = 2;
+
+ // The sequence number of the first RPC that has not been marked as completed by the client.
+ // Unset if there isn't an incomplete RPC.
+ required int64 first_incomplete_seq_no = 3;
+
+ // The number of times this RPC has been tried.
+ // Set to 1 in the first attempt.
+ required int64 attempt_no = 4;
+}
+
+// The header for the RPC request frame.
+message RequestHeader {
+ // A sequence number that uniquely identifies a call to a single remote server. This number is
+ // sent back in the Response and allows to match it to the original Request.
+ // Hadoop specifies a uint32 and casts it to a signed int. That is counterintuitive, so we use an
+ // int32 instead. Allowed values (inherited from Hadoop):
+ // 0 through INT32_MAX: Regular RPC call IDs.
+ // -2: Invalid call ID.
+ // -3: Connection context call ID.
+ // -33: SASL negotiation call ID.
+ //
+ // NOTE: these calls must be increasing but may have gaps.
+ required int32 call_id = 3;
+
+ // RPC method being invoked.
+ // Not used for "connection setup" calls.
+ optional RemoteMethodPB remote_method = 6;
+
+ // Propagate the timeout as specified by the user. Note that, since there is some
+ // transit time between the client and server, if you wait exactly this amount of
+ // time and then respond, you are likely to cause a timeout on the client.
+ optional uint32 timeout_millis = 10;
+
+ // Feature flags that the service must support in order to properly interpret this
+ // request. The client can pass any set of flags, and if the server doesn't
+ // support any of them, then it will fail the request.
+ //
+ // NOTE: these are for evolving features at the level of the application, not
+ // the RPC framework. Hence, we have to use a generic int type rather than a
+ // particular enum.
+ // NOTE: the server will only interpret this field if it supports the
+ // APPLICATION_FEATURE_FLAGS flag.
+ repeated uint32 required_feature_flags = 11;
+
+ // The unique id of this request, if it's retriable and if the results are to be tracked.
+ // The request id is unique per logical request, i.e. retries of the same RPC must have the
+ // same request id.
+ // Note that this is different from 'call_id' in that a call_id is unique to a server while a
+ // request_id is unique to a logical request (i.e. the request_id remains the same when a request
+ // is retried on a different server).
+ // Optional for requests that are naturally idempotent or to maintain compatibility with
+ // older clients for requests that are not.
+ optional RequestIdPB request_id = 15;
+
+ // Byte offsets for side cars in the main body of the request message.
+ // These offsets are counted AFTER the message header, i.e., offset 0
+ // is the first byte after the bytes for this protobuf.
+ repeated uint32 sidecar_offsets = 16;
+}
+
+message ResponseHeader {
+ required int32 call_id = 1;
+
+ // If this is set, then this is an error response and the
+ // response message will be of type ErrorStatusPB instead of
+ // the expected response type.
+ optional bool is_error = 2 [ default = false ];
+
+ // Byte offsets for side cars in the main body of the response message.
+ // These offsets are counted AFTER the message header, i.e., offset 0
+ // is the first byte after the bytes for this protobuf.
+ repeated uint32 sidecar_offsets = 3;
+}
+
+// Sent as response when is_error == true.
+message ErrorStatusPB {
+
+ // These codes have all been inherited from Hadoop's RPC mechanism.
+ enum RpcErrorCodePB {
+ FATAL_UNKNOWN = 10;
+
+ // Non-fatal RPC errors. Connection should be left open for future RPC calls.
+ //------------------------------------------------------------
+ // The application generated an error status. See the message field for
+ // more details.
+ ERROR_APPLICATION = 1;
+
+ // The specified method was not valid.
+ ERROR_NO_SUCH_METHOD = 2;
+
+ // The specified service was not valid.
+ ERROR_NO_SUCH_SERVICE = 3;
+
+ // The server is overloaded - the client should try again shortly.
+ ERROR_SERVER_TOO_BUSY = 4;
+
+ // The request parameter was not parseable, was missing required fields,
+ // or the server does not support the required feature flags.
+ ERROR_INVALID_REQUEST = 5;
+
+ // The server might have previously received this request but its response is no
+ // longer cached. It's unknown whether the request was executed or not.
+ ERROR_REQUEST_STALE = 6;
+
+ // The server is not able to complete the connection or request at this
+ // time. The client may try again later.
+ ERROR_UNAVAILABLE = 7;
+
+ // FATAL_* errors indicate that the client should shut down the connection.
+ //------------------------------------------------------------
+ // The RPC server is already shutting down.
+ FATAL_SERVER_SHUTTING_DOWN = 11;
+ // Fields of RpcHeader are invalid.
+ FATAL_INVALID_RPC_HEADER = 12;
+ // Could not deserialize RPC request.
+ FATAL_DESERIALIZING_REQUEST = 13;
+ // IPC Layer version mismatch.
+ FATAL_VERSION_MISMATCH = 14;
+ // Auth failed.
+ FATAL_UNAUTHORIZED = 15;
+
+ // The authentication token is invalid or expired;
+ // the client should obtain a new one.
+ FATAL_INVALID_AUTHENTICATION_TOKEN = 16;
+ }
+
+ required string message = 1;
+
+ // TODO: Make code required?
+ optional RpcErrorCodePB code = 2; // Specific error identifier.
+
+ // If the request is failed due to an unsupported feature flag, the particular
+ // flag(s) that were not supported will be sent back to the client.
+ repeated uint32 unsupported_feature_flags = 3;
+
+ // Allow extensions. When the RPC returns ERROR_APPLICATION, the server
+ // should also fill in exactly one of these extension fields, which contains
+ // more details on the service-specific error.
+ extensions 100 to max;
+}
+
+extend google.protobuf.MethodOptions {
+ // An option for RPC methods that allows to set whether that method's
+ // RPC results should be tracked with a ResultTracker.
+ optional bool track_rpc_result = 50006 [default=false];
+
+ // An option to set the authorization method for this particular
+ // RPC method. If this is not specified, the service's 'default_authz_method'
+ // is used.
+ optional string authz_method = 50007;
+}
+
+extend google.protobuf.ServiceOptions {
+ // Set the default authorization method for the RPCs in this service.
+ // If this is not set, then the default authorization is to allow all
+ // RPCs.
+ optional string default_authz_method = 50007;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto
new file mode 100644
index 0000000..7685903
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_introspection.proto
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Protobuf used for introspection of RPC services (eg listing in-flight RPCs,
+// reflection, etc)
+syntax = "proto2";
+
+package kudu.rpc;
+
+option java_package = "org.apache.kudu";
+
+import "kudu/rpc/rpc_header.proto";
+
+message RpcCallInProgressPB {
+ required RequestHeader header = 1;
+ optional string trace_buffer = 2;
+ optional uint64 micros_elapsed = 3;
+
+ enum State {
+ UNKNOWN = 999;
+
+ // States for OutboundCall
+ ON_OUTBOUND_QUEUE = 1;
+ SENDING = 2;
+ SENT = 3;
+ TIMED_OUT = 4;
+ FINISHED_ERROR = 5;
+ FINISHED_SUCCESS = 6;
+ NEGOTIATION_TIMED_OUT = 7;
+ FINISHED_NEGOTIATION_ERROR = 8;
+ CANCELLED = 9;
+
+ // TODO(todd): add states for InboundCall
+ }
+
+ optional State state = 4;
+}
+
+message RpcConnectionPB {
+ enum StateType {
+ UNKNOWN = 999;
+ NEGOTIATING = 0; // Connection is still being negotiated.
+ OPEN = 1; // Connection is active.
+ };
+
+ required string remote_ip = 1;
+ required StateType state = 2;
+ // TODO: swap out for separate fields
+ optional string remote_user_credentials = 3;
+ repeated RpcCallInProgressPB calls_in_flight = 4;
+ optional int64 outbound_queue_size = 5;
+}
+
+message DumpRunningRpcsRequestPB {
+ optional bool include_traces = 1 [ default = false ];
+}
+
+message DumpRunningRpcsResponsePB {
+ repeated RpcConnectionPB inbound_connections = 1;
+ repeated RpcConnectionPB outbound_connections = 2;
+}
+
+//------------------------------------------------------------
+
+// A particular TraceMetric key/value pair from a sampled RPC.
+message TraceMetricPB {
+ // A '.'-separated path through the parent-child trace hierarchy.
+ optional string child_path = 1;
+ optional string key = 2;
+ optional int64 value = 3;
+}
+
+// A single sampled RPC call.
+message RpczSamplePB {
+ // The original request header.
+ optional RequestHeader header = 1;
+ // The stringified request trace.
+ optional string trace = 2;
+ // The number of millis that this call took to complete.
+ optional int32 duration_ms = 3;
+ // The metrics from the sampled trace.
+ repeated TraceMetricPB metrics = 4;
+}
+
+// A set of samples for a particular RPC method.
+message RpczMethodPB {
+ required string method_name = 1;
+ repeated RpczSamplePB samples = 2;
+}
+
+// Request and response for dumping previously sampled RPC calls.
+message DumpRpczStoreRequestPB {
+}
+message DumpRpczStoreResponsePB {
+ repeated RpczMethodPB methods = 1;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_service.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_service.h b/be/src/kudu/rpc/rpc_service.h
new file mode 100644
index 0000000..dcaa9c1
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_service.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_SERVICE_H_
+#define KUDU_RPC_SERVICE_H_
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+class RemoteMethod;
+struct RpcMethodInfo;
+class InboundCall;
+
+class RpcService : public RefCountedThreadSafe<RpcService> {
+ public:
+ virtual ~RpcService() {}
+
+ // Enqueue a call for processing.
+ // On failure, the RpcService::QueueInboundCall() implementation is
+ // responsible for responding to the client with a failure message.
+ virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) = 0;
+
+ virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) {
+ return nullptr;
+ }
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_SERVICE_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_sidecar.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_sidecar.cc b/be/src/kudu/rpc/rpc_sidecar.cc
new file mode 100644
index 0000000..b4de678
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_sidecar.cc
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_sidecar.h"
+
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include <google/protobuf/repeated_field.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/status.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not
+// owned by this class, and it's the caller's responsibility to ensure it has a lifetime
+// at least as long as this sidecar.
+class SliceSidecar : public RpcSidecar {
+ public:
+ explicit SliceSidecar(Slice slice) : slice_(slice) { }
+ Slice AsSlice() const override { return slice_; }
+
+ private:
+ const Slice slice_;
+};
+
+class FaststringSidecar : public RpcSidecar {
+ public:
+ explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { }
+ Slice AsSlice() const override { return *data_; }
+
+ private:
+ const unique_ptr<faststring> data_;
+};
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) {
+ return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
+ return unique_ptr<RpcSidecar>(new SliceSidecar(slice));
+}
+
+
+Status RpcSidecar::ParseSidecars(
+ const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+ Slice buffer, Slice* sidecars) {
+ if (offsets.size() == 0) return Status::OK();
+
+ int last = offsets.size() - 1;
+ if (last >= TransferLimits::kMaxSidecars) {
+ return Status::Corruption(strings::Substitute(
+ "Received $0 additional payload slices, expected at most $1",
+ last, TransferLimits::kMaxSidecars));
+ }
+
+ if (buffer.size() > TransferLimits::kMaxTotalSidecarBytes) {
+ return Status::Corruption(strings::Substitute(
+ "Received $0 payload bytes, expected at most $1",
+ buffer.size(), TransferLimits::kMaxTotalSidecarBytes));
+ }
+
+ for (int i = 0; i < last; ++i) {
+ int64_t cur_offset = offsets.Get(i);
+ int64_t next_offset = offsets.Get(i + 1);
+ if (next_offset > buffer.size()) {
+ return Status::Corruption(strings::Substitute(
+ "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+ " has length $2, but the entire message has length $3",
+ i, cur_offset, (next_offset - cur_offset), buffer.size()));
+ }
+ if (next_offset < cur_offset) {
+ return Status::Corruption(strings::Substitute(
+ "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+ " but ends before that at offset $1.", i, cur_offset, next_offset));
+ }
+
+ sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
+ }
+
+ int64_t cur_offset = offsets.Get(last);
+ if (cur_offset > buffer.size()) {
+ return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 "
+ "starts at offset $1after message ends (message length $2).", last,
+ cur_offset, buffer.size()));
+ }
+ sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
+
+ return Status::OK();
+}
+
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/rpc_sidecar.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_sidecar.h b/be/src/kudu/rpc/rpc_sidecar.h
new file mode 100644
index 0000000..bfbfcea
--- /dev/null
+++ b/be/src/kudu/rpc/rpc_sidecar.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_RPC_SIDECAR_H
+#define KUDU_RPC_RPC_SIDECAR_H
+
+#include <memory>
+
+#include <google/protobuf/repeated_field.h> // IWYU pragma: keep
+#include <google/protobuf/stubs/port.h>
+
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+class Status;
+class faststring;
+
+namespace rpc {
+
+// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data
+// without extra copies. In other words, whenever a protobuf would have a large field
+// where additional copies become expensive, one may opt instead to use an RpcSidecar.
+//
+// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and
+// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the
+// client and server respectively. They are ignorant of the sidecar's format, requiring
+// only that it can be represented as a Slice. Data is copied from the Slice returned from
+// AsSlice() to the socket that is responding to the original RPC. The slice should remain
+// valid for as long as the call it is attached to takes to complete.
+//
+// In order to distinguish between separate sidecars, whenever a sidecar is added to the
+// RPC response on the server side, an index for that sidecar is returned. This index must
+// then in some way (i.e., via protobuf) be communicated to the recipient.
+//
+// After reconstructing the array of sidecars, servers and clients may retrieve the
+// sidecar data through the RpcContext or RpcController interfaces respectively.
+class RpcSidecar {
+ public:
+ static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data);
+ static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
+
+ // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and
+ // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and
+ // will be filled from index 0.
+ // TODO(henryr): Consider a vector instead here if there's no perf. impact.
+ static Status ParseSidecars(
+ const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+ Slice buffer, Slice* sidecars);
+
+ // Returns a Slice representation of the sidecar's data.
+ virtual Slice AsSlice() const = 0;
+ virtual ~RpcSidecar() { }
+};
+
+} // namespace rpc
+} // namespace kudu
+
+
+#endif /* KUDU_RPC_RPC_SIDECAR_H */