You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:57 UTC
[45/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.h b/be/src/kudu/rpc/client_negotiation.h
new file mode 100644
index 0000000..06fb2b8
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.h
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdlib>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <sasl/sasl.h>
+
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/security_flags.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Slice;
+class faststring;
+
+namespace security {
+class TlsContext;
+}
+
+namespace rpc {
+
+// Class for doing KRPC negotiation with a remote server over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class ClientNegotiation {
+ public:
+ // Creates a new client negotiation instance, taking ownership of the
+ // provided socket. After completing the negotiation process by setting the
+ // desired options and calling Negotiate(), the socket can be retrieved with
+ // 'release_socket'.
+ //
+ // The provided TlsContext must outlive this negotiation instance.
+ ClientNegotiation(std::unique_ptr<Socket> socket,
+ const security::TlsContext* tls_context,
+ boost::optional<security::SignedTokenPB> authn_token,
+ RpcEncryption encryption,
+ std::string sasl_proto_name);
+
+ // Enable PLAIN authentication.
+ // Must be called before Negotiate().
+ Status EnablePlain(const std::string& user,
+ const std::string& pass);
+
+ // Enable GSSAPI authentication.
+ // Must be called before Negotiate().
+ Status EnableGSSAPI();
+
+ // Returns mechanism negotiated by this connection.
+ // Must be called after Negotiate().
+ SaslMechanism::Type negotiated_mechanism() const;
+
+ // Returns the negotiated authentication type for the connection.
+ // Must be called after Negotiate().
+ AuthenticationType negotiated_authn() const {
+ DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID);
+ return negotiated_authn_;
+ }
+
+ // Returns true if TLS was negotiated.
+ // Must be called after Negotiate().
+ bool tls_negotiated() const {
+ return tls_negotiated_;
+ }
+
+ // Returns the set of RPC system features supported by the remote server.
+ // Must be called before Negotiate().
+ std::set<RpcFeatureFlag> server_features() const {
+ return server_features_;
+ }
+
+ // Returns the set of RPC system features supported by the remote server.
+ // Must be called after Negotiate().
+ // Subsequent calls to this method or server_features() will return an empty set.
+ std::set<RpcFeatureFlag> take_server_features() {
+ return std::move(server_features_);
+ }
+
+ // Specify the fully-qualified domain name of the remote server.
+ // Must be called before Negotiate(). Required for some mechanisms.
+ void set_server_fqdn(const std::string& domain_name);
+
+ // Set deadline for connection negotiation.
+ void set_deadline(const MonoTime& deadline);
+
+ Socket* socket() { return socket_.get(); }
+
+ // Takes and returns the socket owned by this client negotiation. The caller
+ // will own the socket after this call, and the negotiation instance should no
+ // longer be used. Must be called after Negotiate(). Subsequent calls to this
+ // method or socket() will return a null pointer.
+ std::unique_ptr<Socket> release_socket() { return std::move(socket_); }
+
+ // Negotiate with the remote server. Should only be called once per
+ // ClientNegotiation and socket instance, after all options have been set.
+ //
+ // Returns OK on success, otherwise may return NotAuthorized, NotSupported, or
+ // another non-OK status.
+ Status Negotiate(std::unique_ptr<ErrorStatusPB>* rpc_error = nullptr);
+
+ // SASL callback for plugin options, supported mechanisms, etc.
+ // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+ int GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len);
+
+ // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+ int SimpleCb(int id, const char** result, unsigned* len);
+
+ // SASL callback for SASL_CB_PASS
+ int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
+
+ // Check that GSSAPI/Kerberos credentials are available.
+ static Status CheckGSSAPI() WARN_UNUSED_RESULT;
+
+ private:
+
+ // Encode and send the specified negotiate request message to the server.
+ Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT;
+
+ // Receive a negotiate response message from the server, deserializing it into 'msg'.
+ // Validates that the response is not an error.
+ Status RecvNegotiatePB(NegotiatePB* msg,
+ faststring* buffer,
+ std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+ // Parse error status message from raw bytes of an ErrorStatusPB.
+ Status ParseError(const Slice& err_data,
+ std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+ Status SendConnectionHeader() WARN_UNUSED_RESULT;
+
+ // Initialize the SASL client negotiation instance.
+ Status InitSaslClient() WARN_UNUSED_RESULT;
+
+ // Send a NEGOTIATE step message to the server.
+ Status SendNegotiate() WARN_UNUSED_RESULT;
+
+ // Handle NEGOTIATE step response from the server.
+ Status HandleNegotiate(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Send a TLS_HANDSHAKE request message to the server with the provided token.
+ Status SendTlsHandshake(std::string tls_token) WARN_UNUSED_RESULT;
+
+ // Handle a TLS_HANDSHAKE response message from the server.
+ Status HandleTlsHandshake(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Authenticate to the server using SASL.
+ // 'recv_buf' allows a receive buffer to be reused.
+ Status AuthenticateBySasl(faststring* recv_buf,
+ std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+ // Authenticate to the server using a token.
+ // 'recv_buf' allows a receive buffer to be reused.
+ Status AuthenticateByToken(faststring* recv_buf,
+ std::unique_ptr<ErrorStatusPB> *rpc_error) WARN_UNUSED_RESULT;
+
+ // Send an SASL_INITIATE message to the server.
+ // Returns:
+ // Status::OK if the SASL_SUCCESS message is expected next.
+ // Status::Incomplete if the SASL_CHALLENGE message is expected next.
+ // Any other status indicates an error.
+ Status SendSaslInitiate() WARN_UNUSED_RESULT;
+
+ // Send a SASL_RESPONSE message to the server.
+ Status SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) WARN_UNUSED_RESULT;
+
+ // Handle case when server sends SASL_CHALLENGE response.
+ // Returns:
+ // Status::OK if a SASL_SUCCESS message is expected next.
+ // Status::Incomplete if another SASL_CHALLENGE message is expected.
+ // Any other status indicates an error.
+ Status HandleSaslChallenge(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Handle case when server sends SASL_SUCCESS response.
+ Status HandleSaslSuccess(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+ // Perform a client-side step of the SASL negotiation.
+ // Input is what came from the server. Output is what we will send back to the server.
+ // Returns:
+ // Status::OK if sasl_client_step returns SASL_OK.
+ // Status::Incomplete if sasl_client_step returns SASL_CONTINUE
+ // otherwise returns an appropriate error status.
+ Status DoSaslStep(const std::string& in, const char** out, unsigned* out_len) WARN_UNUSED_RESULT;
+
+ Status SendConnectionContext() WARN_UNUSED_RESULT;
+
+ // The socket to the remote server.
+ std::unique_ptr<Socket> socket_;
+
+ // SASL state.
+ std::vector<sasl_callback_t> callbacks_;
+ std::unique_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+ SaslHelper helper_;
+ boost::optional<std::string> nonce_;
+
+ // TLS state.
+ const security::TlsContext* tls_context_;
+ security::TlsHandshake tls_handshake_;
+ const RpcEncryption encryption_;
+ bool tls_negotiated_;
+
+ // TSK state.
+ boost::optional<security::SignedTokenPB> authn_token_;
+
+ // Authentication state.
+ std::string plain_auth_user_;
+ std::string plain_pass_;
+ std::unique_ptr<sasl_secret_t, decltype(std::free)*> psecret_;
+
+ // The set of features advertised by the client. Filled in when we send
+ // the first message. This is not necessarily constant since some features
+ // may be dynamically enabled.
+ std::set<RpcFeatureFlag> client_features_;
+
+ // The set of features supported by the server. Filled in during negotiation.
+ std::set<RpcFeatureFlag> server_features_;
+
+ // The authentication type. Filled in during negotiation.
+ AuthenticationType negotiated_authn_;
+
+ // The SASL mechanism used by the connection. Filled in during negotiation.
+ SaslMechanism::Type negotiated_mech_;
+
+ // The SASL protocol name that is used for the SASL negotiation.
+ const std::string sasl_proto_name_;
+
+ // Negotiation timeout deadline.
+ MonoTime deadline_;
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc
new file mode 100644
index 0000000..1632dd3
--- /dev/null
+++ b/be/src/kudu/rpc/connection.cc
@@ -0,0 +1,767 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/connection.h"
+
+#include <algorithm>
+#include <cerrno>
+#include <iostream>
+#include <memory>
+#include <set>
+#include <string>
+#include <type_traits>
+
+#include <boost/intrusive/detail/list_iterator.hpp>
+#include <boost/intrusive/list.hpp>
+#include <ev.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/slice.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+using std::includes;
+using std::set;
+using std::shared_ptr;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+typedef OutboundCall::Phase Phase;
+
+///
+/// Connection
+///
+Connection::Connection(ReactorThread *reactor_thread,
+ Sockaddr remote,
+ unique_ptr<Socket> socket,
+ Direction direction,
+ CredentialsPolicy policy)
+ : reactor_thread_(reactor_thread),
+ remote_(remote),
+ socket_(std::move(socket)),
+ direction_(direction),
+ last_activity_time_(MonoTime::Now()),
+ is_epoll_registered_(false),
+ next_call_id_(1),
+ credentials_policy_(policy),
+ negotiation_complete_(false),
+ is_confidential_(false),
+ scheduled_for_shutdown_(false) {
+}
+
+Status Connection::SetNonBlocking(bool enabled) {
+ return socket_->SetNonBlocking(enabled);
+}
+
+void Connection::EpollRegister(ev::loop_ref& loop) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+ DVLOG(4) << "Registering connection for epoll: " << ToString();
+ write_io_.set(loop);
+ write_io_.set(socket_->GetFd(), ev::WRITE);
+ write_io_.set<Connection, &Connection::WriteHandler>(this);
+ if (direction_ == CLIENT && negotiation_complete_) {
+ write_io_.start();
+ }
+ read_io_.set(loop);
+ read_io_.set(socket_->GetFd(), ev::READ);
+ read_io_.set<Connection, &Connection::ReadHandler>(this);
+ read_io_.start();
+ is_epoll_registered_ = true;
+}
+
+Connection::~Connection() {
+ // Must clear the outbound_transfers_ list before deleting.
+ CHECK(outbound_transfers_.begin() == outbound_transfers_.end());
+
+ // It's crucial that the connection is Shutdown first -- otherwise
+ // our destructor will end up calling read_io_.stop() and write_io_.stop()
+ // from a possibly non-reactor thread context. This can then make all
+ // hell break loose with libev.
+ CHECK(!is_epoll_registered_);
+}
+
+bool Connection::Idle() const {
+ DCHECK(reactor_thread_->IsCurrentThread());
+ // check if we're in the middle of receiving something
+ InboundTransfer *transfer = inbound_.get();
+ if (transfer && (transfer->TransferStarted())) {
+ return false;
+ }
+ // check if we still need to send something
+ if (!outbound_transfers_.empty()) {
+ return false;
+ }
+ // can't kill a connection if calls are waiting response
+ if (!awaiting_response_.empty()) {
+ return false;
+ }
+
+ if (!calls_being_handled_.empty()) {
+ return false;
+ }
+
+ // We are not idle if we are in the middle of connection negotiation.
+ if (!negotiation_complete_) {
+ return false;
+ }
+
+ return true;
+}
+
+void Connection::Shutdown(const Status &status,
+ unique_ptr<ErrorStatusPB> rpc_error) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+ shutdown_status_ = status.CloneAndPrepend("RPC connection failed");
+
+ if (inbound_ && inbound_->TransferStarted()) {
+ double secs_since_active =
+ (reactor_thread_->cur_time() - last_activity_time_).ToSeconds();
+ LOG(WARNING) << "Shutting down " << ToString()
+ << " with pending inbound data ("
+ << inbound_->StatusAsString() << ", last active "
+ << HumanReadableElapsedTime::ToShortString(secs_since_active)
+ << " ago, status=" << status.ToString() << ")";
+ }
+
+ // Clear any calls which have been sent and were awaiting a response.
+ for (const car_map_t::value_type &v : awaiting_response_) {
+ CallAwaitingResponse *c = v.second;
+ if (c->call) {
+ // Make sure every awaiting call receives the error info, if any.
+ unique_ptr<ErrorStatusPB> error;
+ if (rpc_error) {
+ error.reset(new ErrorStatusPB(*rpc_error));
+ }
+ c->call->SetFailed(status,
+ negotiation_complete_ ? Phase::REMOTE_CALL
+ : Phase::CONNECTION_NEGOTIATION,
+ std::move(error));
+ }
+ // And we must return the CallAwaitingResponse to the pool
+ car_pool_.Destroy(c);
+ }
+ awaiting_response_.clear();
+
+ // Clear any outbound transfers.
+ while (!outbound_transfers_.empty()) {
+ OutboundTransfer *t = &outbound_transfers_.front();
+ outbound_transfers_.pop_front();
+ delete t;
+ }
+
+ read_io_.stop();
+ write_io_.stop();
+ is_epoll_registered_ = false;
+ if (socket_) {
+ WARN_NOT_OK(socket_->Close(), "Error closing socket");
+ }
+}
+
+void Connection::QueueOutbound(gscoped_ptr<OutboundTransfer> transfer) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+
+ if (!shutdown_status_.ok()) {
+ // If we've already shut down, then we just need to abort the
+ // transfer rather than bothering to queue it.
+ transfer->Abort(shutdown_status_);
+ return;
+ }
+
+ DVLOG(3) << "Queueing transfer: " << transfer->HexDump();
+
+ outbound_transfers_.push_back(*transfer.release());
+
+ if (negotiation_complete_ && !write_io_.is_active()) {
+ // If we weren't currently in the middle of sending anything,
+ // then our write_io_ interest is stopped. Need to re-start it.
+ // Only do this after connection negotiation is done doing its work.
+ write_io_.start();
+ }
+}
+
+Connection::CallAwaitingResponse::~CallAwaitingResponse() {
+ DCHECK(conn->reactor_thread_->IsCurrentThread());
+}
+
+void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int revents) {
+ if (remaining_timeout > 0) {
+ if (watcher.remaining() < -1.0) {
+ LOG(WARNING) << "RPC call timeout handler was delayed by "
+ << -watcher.remaining() << "s! This may be due to a process-wide "
+ << "pause such as swapping, logging-related delays, or allocator lock "
+ << "contention. Will allow an additional "
+ << remaining_timeout << "s for a response.";
+ }
+
+ watcher.set(remaining_timeout, 0);
+ watcher.start();
+ remaining_timeout = 0;
+ return;
+ }
+
+ conn->HandleOutboundCallTimeout(this);
+}
+
+void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+ DCHECK(car->call);
+ // The timeout timer is stopped by the car destructor exiting Connection::HandleCallResponse()
+ DCHECK(!car->call->IsFinished());
+
+ // Mark the call object as failed.
+ car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL
+ : Phase::CONNECTION_NEGOTIATION);
+
+ // Test cancellation when 'car->call' is in 'TIMED_OUT' state
+ MaybeInjectCancellation(car->call);
+
+ // Drop the reference to the call. If the original caller has moved on after
+ // seeing the timeout, we no longer need to hold onto the allocated memory
+ // from the request.
+ car->call.reset();
+
+ // We still leave the CallAwaitingResponse in the map -- this is because we may still
+ // receive a response from the server, and we don't want a spurious log message
+ // when we do finally receive the response. The fact that CallAwaitingResponse::call
+ // is a NULL pointer indicates to the response processing code that the call
+ // already timed out.
+}
+
+void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) {
+ CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, call->call_id());
+ if (car != nullptr) {
+ // car->call may be NULL if the call has timed out already.
+ DCHECK(!car->call || car->call.get() == call.get());
+ car->call.reset();
+ }
+}
+
+// Inject a cancellation when 'call' is in state 'FLAGS_rpc_inject_cancellation_state'.
+void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> &call) {
+ if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+ reactor_thread_->reactor()->messenger()->QueueCancellation(call);
+ }
+}
+
+// Callbacks after sending a call on the wire.
+// This notifies the OutboundCall object to change its state to SENT once it
+// has been fully transmitted.
+struct CallTransferCallbacks : public TransferCallbacks {
+ public:
+ explicit CallTransferCallbacks(shared_ptr<OutboundCall> call,
+ Connection *conn)
+ : call_(std::move(call)), conn_(conn) {}
+
+ virtual void NotifyTransferFinished() OVERRIDE {
+ // TODO: would be better to cancel the transfer while it is still on the queue if we
+ // timed out before the transfer started, but there is still a race in the case of
+ // a partial send that we have to handle here
+ if (call_->IsFinished()) {
+ DCHECK(call_->IsTimedOut() || call_->IsCancelled());
+ } else {
+ call_->SetSent();
+ // Test cancellation when 'call_' is in 'SENT' state.
+ conn_->MaybeInjectCancellation(call_);
+ }
+ delete this;
+ }
+
+ virtual void NotifyTransferAborted(const Status &status) OVERRIDE {
+ VLOG(1) << "Transfer of RPC call " << call_->ToString() << " aborted: "
+ << status.ToString();
+ delete this;
+ }
+
+ private:
+ shared_ptr<OutboundCall> call_;
+ Connection* conn_;
+};
+
+void Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) {
+ DCHECK(call);
+ DCHECK_EQ(direction_, CLIENT);
+ DCHECK(reactor_thread_->IsCurrentThread());
+
+ if (PREDICT_FALSE(!shutdown_status_.ok())) {
+ // Already shutdown
+ call->SetFailed(shutdown_status_,
+ negotiation_complete_ ? Phase::REMOTE_CALL
+ : Phase::CONNECTION_NEGOTIATION);
+ return;
+ }
+
+ // At this point the call has a serialized request, but no call header, since we haven't
+ // yet assigned a call ID.
+ DCHECK(!call->call_id_assigned());
+
+ // We shouldn't reach this point if 'call' was requested to be cancelled.
+ DCHECK(!call->cancellation_requested());
+
+ // Assign the call ID.
+ int32_t call_id = GetNextCallId();
+ call->set_call_id(call_id);
+
+ // Serialize the actual bytes to be put on the wire.
+ TransferPayload tmp_slices;
+ size_t n_slices = call->SerializeTo(&tmp_slices);
+
+ call->SetQueued();
+
+ // Test cancellation when 'call_' is in 'ON_OUTBOUND_QUEUE' state.
+ MaybeInjectCancellation(call);
+
+ scoped_car car(car_pool_.make_scoped_ptr(car_pool_.Construct()));
+ car->conn = this;
+ car->call = call;
+
+ // Set up the timeout timer.
+ const MonoDelta &timeout = call->controller()->timeout();
+ if (timeout.Initialized()) {
+ reactor_thread_->RegisterTimeout(&car->timeout_timer);
+ car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*)
+ &CallAwaitingResponse::HandleTimeout>(car.get());
+
+ // For calls with a timeout of at least 500ms, we actually run the timeout
+ // handler in two stages. The first timeout fires with a timeout 10% less
+ // than the user-specified one. It then schedules a second timeout for the
+ // remaining amount of time.
+ //
+ // The purpose of this two-stage timeout is to be more robust when the client
+ // has some process-wide pause, such as lock contention in tcmalloc, or a
+ // reactor callback that blocks in glog. Consider the following case:
+ //
+ // T = 0s user issues an RPC with 5 second timeout
+ // T = 0.5s - 6s process is blocked
+ // T = 6s process unblocks, and the timeout fires (1s late)
+ //
+ // Without the two-stage timeout, we would determine that the call had timed out,
+ // even though it's likely that the response is waiting on our TCP socket.
+ // With the two-stage timeout, we'll end up with:
+ //
+ // T = 0s user issues an RPC with 5 second timeout
+ // T = 0.5s - 6s process is blocked
+ // T = 6s process unblocks, and the first-stage timeout fires (1.5s late)
+ // T = 6s - 6.200s time for the client to read the response which is waiting
+ // T = 6.200s if the response was not actually available, we'll time out here
+ //
+ // We don't bother with this logic for calls with very short timeouts - assumedly
+ // a user setting such a short RPC timeout is well equipped to handle one.
+ double time = timeout.ToSeconds();
+ if (time >= 0.5) {
+ car->remaining_timeout = time * 0.1;
+ time -= car->remaining_timeout;
+ } else {
+ car->remaining_timeout = 0;
+ }
+
+ car->timeout_timer.set(time, 0);
+ car->timeout_timer.start();
+ }
+
+ TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this);
+ awaiting_response_[call_id] = car.release();
+ QueueOutbound(gscoped_ptr<OutboundTransfer>(
+ OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb)));
+}
+
+// Callbacks for sending an RPC call response from the server.
+// This takes ownership of the InboundCall object so that, once it has
+// been responded to, we can free up all of the associated memory.
+struct ResponseTransferCallbacks : public TransferCallbacks {
+ public:
+ ResponseTransferCallbacks(gscoped_ptr<InboundCall> call,
+ Connection *conn) :
+ call_(std::move(call)),
+ conn_(conn)
+ {}
+
+ ~ResponseTransferCallbacks() {
+ // Remove the call from the map.
+ InboundCall *call_from_map = EraseKeyReturnValuePtr(
+ &conn_->calls_being_handled_, call_->call_id());
+ DCHECK_EQ(call_from_map, call_.get());
+ }
+
+ virtual void NotifyTransferFinished() OVERRIDE {
+ delete this;
+ }
+
+ virtual void NotifyTransferAborted(const Status &status) OVERRIDE {
+ LOG(WARNING) << "Connection torn down before " <<
+ call_->ToString() << " could send its response";
+ delete this;
+ }
+
+ private:
+ gscoped_ptr<InboundCall> call_;
+ Connection *conn_;
+};
+
+// Reactor task which puts a transfer on the outbound transfer queue.
+class QueueTransferTask : public ReactorTask {
+ public:
+ QueueTransferTask(gscoped_ptr<OutboundTransfer> transfer,
+ Connection *conn)
+ : transfer_(std::move(transfer)),
+ conn_(conn)
+ {}
+
+ virtual void Run(ReactorThread *thr) OVERRIDE {
+ conn_->QueueOutbound(std::move(transfer_));
+ delete this;
+ }
+
+ virtual void Abort(const Status &status) OVERRIDE {
+ transfer_->Abort(status);
+ delete this;
+ }
+
+ private:
+ gscoped_ptr<OutboundTransfer> transfer_;
+ Connection *conn_;
+};
+
+void Connection::QueueResponseForCall(gscoped_ptr<InboundCall> call) {
+ // This is usually called by the IPC worker thread when the response
+ // is set, but in some circumstances may also be called by the
+ // reactor thread (e.g. if the service has shut down)
+
+ DCHECK_EQ(direction_, SERVER);
+
+ // If the connection is torn down, then the QueueOutbound() call that
+ // eventually runs in the reactor thread will take care of calling
+ // ResponseTransferCallbacks::NotifyTransferAborted.
+
+ TransferPayload tmp_slices;
+ size_t n_slices = call->SerializeResponseTo(&tmp_slices);
+
+ TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
+ // After the response is sent, can delete the InboundCall object.
+ // We set a dummy call ID and required feature set, since these are not needed
+ // when sending responses.
+ gscoped_ptr<OutboundTransfer> t(
+ OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb));
+
+ QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
+ reactor_thread_->reactor()->ScheduleReactorTask(task);
+}
+
+void Connection::set_confidential(bool is_confidential) {
+ is_confidential_ = is_confidential;
+}
+
+bool Connection::SatisfiesCredentialsPolicy(CredentialsPolicy policy) const {
+ DCHECK_EQ(direction_, CLIENT);
+ return (policy == CredentialsPolicy::ANY_CREDENTIALS) ||
+ (policy == credentials_policy_);
+}
+
+RpczStore* Connection::rpcz_store() {
+ return reactor_thread_->reactor()->messenger()->rpcz_store();
+}
+
+void Connection::ReadHandler(ev::io &watcher, int revents) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+
+ DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")";
+ if (revents & EV_ERROR) {
+ reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
+ ": ReadHandler encountered an error"));
+ return;
+ }
+ last_activity_time_ = reactor_thread_->cur_time();
+
+ while (true) {
+ if (!inbound_) {
+ inbound_.reset(new InboundTransfer());
+ }
+ Status status = inbound_->ReceiveBuffer(*socket_);
+ if (PREDICT_FALSE(!status.ok())) {
+ if (status.posix_code() == ESHUTDOWN) {
+ VLOG(1) << ToString() << " shut down by remote end.";
+ } else {
+ LOG(WARNING) << ToString() << " recv error: " << status.ToString();
+ }
+ reactor_thread_->DestroyConnection(this, status);
+ return;
+ }
+ if (!inbound_->TransferFinished()) {
+ DVLOG(3) << ToString() << ": read is not yet finished yet.";
+ return;
+ }
+ DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size() << " bytes";
+
+ if (direction_ == CLIENT) {
+ HandleCallResponse(std::move(inbound_));
+ } else if (direction_ == SERVER) {
+ HandleIncomingCall(std::move(inbound_));
+ } else {
+ LOG(FATAL) << "Invalid direction: " << direction_;
+ }
+
+ // TODO: it would seem that it would be good to loop around and see if
+ // there is more data on the socket by trying another recv(), but it turns
+ // out that it really hurts throughput to do so. A better approach
+ // might be for each InboundTransfer to actually try to read an extra byte,
+ // and if it succeeds, then we'd copy that byte into a new InboundTransfer
+ // and loop around, since it's likely the next call also arrived at the
+ // same time.
+ break;
+ }
+}
+
+void Connection::HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+
+ gscoped_ptr<InboundCall> call(new InboundCall(this));
+ Status s = call->ParseFrom(std::move(transfer));
+ if (!s.ok()) {
+ LOG(WARNING) << ToString() << ": received bad data: " << s.ToString();
+ // TODO: shutdown? probably, since any future stuff on this socket will be
+ // "unsynchronized"
+ return;
+ }
+
+ if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get())) {
+ LOG(WARNING) << ToString() << ": received call ID " << call->call_id() <<
+ " but was already processing this ID! Ignoring";
+ reactor_thread_->DestroyConnection(
+ this, Status::RuntimeError("Received duplicate call id",
+ Substitute("$0", call->call_id())));
+ return;
+ }
+
+ reactor_thread_->reactor()->messenger()->QueueInboundCall(std::move(call));
+}
+
+void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+ gscoped_ptr<CallResponse> resp(new CallResponse);
+ CHECK_OK(resp->ParseFrom(std::move(transfer)));
+
+ CallAwaitingResponse *car_ptr =
+ EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id());
+ if (PREDICT_FALSE(car_ptr == nullptr)) {
+ LOG(WARNING) << ToString() << ": Got a response for call id " << resp->call_id() << " which "
+ << "was not pending! Ignoring.";
+ return;
+ }
+
+ // The car->timeout_timer ev::timer will be stopped automatically by its destructor.
+ scoped_car car(car_pool_.make_scoped_ptr(car_ptr));
+
+ if (PREDICT_FALSE(!car->call)) {
+ // The call already failed due to a timeout.
+ VLOG(1) << "Got response to call id " << resp->call_id() << " after client "
+ << "already timed out or cancelled";
+ return;
+ }
+
+ car->call->SetResponse(std::move(resp));
+
+ // Test cancellation when 'car->call' is in 'FINISHED_SUCCESS' or 'FINISHED_ERROR' state.
+ MaybeInjectCancellation(car->call);
+}
+
+void Connection::WriteHandler(ev::io &watcher, int revents) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+
+ if (revents & EV_ERROR) {
+ reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
+ ": writeHandler encountered an error"));
+ return;
+ }
+ DVLOG(3) << ToString() << ": writeHandler: revents = " << revents;
+
+ OutboundTransfer *transfer;
+ if (outbound_transfers_.empty()) {
+ LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is "
+ "nothing to write.";
+ write_io_.stop();
+ return;
+ }
+
+ while (!outbound_transfers_.empty()) {
+ transfer = &(outbound_transfers_.front());
+
+ if (!transfer->TransferStarted()) {
+
+ if (transfer->is_for_outbound_call()) {
+ CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
+ if (!car->call) {
+ // If the call has already timed out or has already been cancelled, the 'call'
+ // field would be set to NULL. In that case, don't bother sending it.
+ outbound_transfers_.pop_front();
+ transfer->Abort(Status::Aborted("already timed out or cancelled"));
+ delete transfer;
+ continue;
+ }
+
+ // If this is the start of the transfer, then check if the server has the
+ // required RPC flags. We have to wait until just before the transfer in
+ // order to ensure that the negotiation has taken place, so that the flags
+ // are available.
+ const set<RpcFeatureFlag>& required_features = car->call->required_rpc_features();
+ if (!includes(remote_features_.begin(), remote_features_.end(),
+ required_features.begin(), required_features.end())) {
+ outbound_transfers_.pop_front();
+ Status s = Status::NotSupported("server does not support the required RPC features");
+ transfer->Abort(s);
+ Phase phase = negotiation_complete_ ? Phase::REMOTE_CALL : Phase::CONNECTION_NEGOTIATION;
+ car->call->SetFailed(std::move(s), phase);
+ // Test cancellation when 'call_' is in 'FINISHED_ERROR' state.
+ MaybeInjectCancellation(car->call);
+ car->call.reset();
+ delete transfer;
+ continue;
+ }
+
+ car->call->SetSending();
+
+ // Test cancellation when 'call_' is in 'SENDING' state.
+ MaybeInjectCancellation(car->call);
+ }
+ }
+
+ last_activity_time_ = reactor_thread_->cur_time();
+ Status status = transfer->SendBuffer(*socket_);
+ if (PREDICT_FALSE(!status.ok())) {
+ LOG(WARNING) << ToString() << " send error: " << status.ToString();
+ reactor_thread_->DestroyConnection(this, status);
+ return;
+ }
+
+ if (!transfer->TransferFinished()) {
+ DVLOG(3) << ToString() << ": writeHandler: xfer not finished.";
+ return;
+ }
+
+ outbound_transfers_.pop_front();
+ delete transfer;
+ }
+
+ // If we were able to write all of our outbound transfers,
+ // we don't have any more to write.
+ write_io_.stop();
+}
+
+std::string Connection::ToString() const {
+ // This may be called from other threads, so we cannot
+ // include anything in the output about the current state,
+ // which might concurrently change from another thread.
+ return strings::Substitute(
+ "$0 $1",
+ direction_ == SERVER ? "server connection from" : "client connection to",
+ remote_.ToString());
+}
+
+// Reactor task that transitions this Connection from connection negotiation to
+// regular RPC handling. Destroys Connection on negotiation error.
+class NegotiationCompletedTask : public ReactorTask {
+ public:
+ NegotiationCompletedTask(Connection* conn,
+ Status negotiation_status,
+ std::unique_ptr<ErrorStatusPB> rpc_error)
+ : conn_(conn),
+ negotiation_status_(std::move(negotiation_status)),
+ rpc_error_(std::move(rpc_error)) {
+ }
+
+ virtual void Run(ReactorThread *rthread) OVERRIDE {
+ rthread->CompleteConnectionNegotiation(conn_,
+ negotiation_status_,
+ std::move(rpc_error_));
+ delete this;
+ }
+
+ virtual void Abort(const Status &status) OVERRIDE {
+ DCHECK(conn_->reactor_thread()->reactor()->closing());
+ VLOG(1) << "Failed connection negotiation due to shut down reactor thread: "
+ << status.ToString();
+ delete this;
+ }
+
+ private:
+ scoped_refptr<Connection> conn_;
+ const Status negotiation_status_;
+ std::unique_ptr<ErrorStatusPB> rpc_error_;
+};
+
+void Connection::CompleteNegotiation(Status negotiation_status,
+ unique_ptr<ErrorStatusPB> rpc_error) {
+ auto task = new NegotiationCompletedTask(
+ this, std::move(negotiation_status), std::move(rpc_error));
+ reactor_thread_->reactor()->ScheduleReactorTask(task);
+}
+
+void Connection::MarkNegotiationComplete() {
+ DCHECK(reactor_thread_->IsCurrentThread());
+ negotiation_complete_ = true;
+}
+
+Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
+ RpcConnectionPB* resp) {
+ DCHECK(reactor_thread_->IsCurrentThread());
+ resp->set_remote_ip(remote_.ToString());
+ if (negotiation_complete_) {
+ resp->set_state(RpcConnectionPB::OPEN);
+ } else {
+ resp->set_state(RpcConnectionPB::NEGOTIATING);
+ }
+
+ if (direction_ == CLIENT) {
+ for (const car_map_t::value_type& entry : awaiting_response_) {
+ CallAwaitingResponse *c = entry.second;
+ if (c->call) {
+ c->call->DumpPB(req, resp->add_calls_in_flight());
+ }
+ }
+
+ resp->set_outbound_queue_size(num_queued_outbound_transfers());
+ } else if (direction_ == SERVER) {
+ if (negotiation_complete_) {
+ // It's racy to dump credentials while negotiating, since the Connection
+ // object is owned by the negotiation thread at that point.
+ resp->set_remote_user_credentials(remote_user_.ToString());
+ }
+ for (const inbound_call_map_t::value_type& entry : calls_being_handled_) {
+ InboundCall* c = entry.second;
+ c->DumpPB(req, resp->add_calls_in_flight());
+ }
+ } else {
+ LOG(FATAL);
+ }
+ return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
new file mode 100644
index 0000000..362a35b
--- /dev/null
+++ b/be/src/kudu/rpc/connection.h
@@ -0,0 +1,391 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_CONNECTION_H
+#define KUDU_RPC_CONNECTION_H
+
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/optional/optional.hpp>
+#include <ev++.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/object_pool.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace rpc {
+
+class DumpRunningRpcsRequestPB;
+class InboundCall;
+class OutboundCall;
+class RpcConnectionPB;
+class ReactorThread;
+class RpczStore;
+enum class CredentialsPolicy;
+
+//
+// A connection between an endpoint and us.
+//
+// Inbound connections are created by AcceptorPools, which eventually schedule
+// RegisterConnection() to be called from the reactor thread.
+//
+// Outbound connections are created by the Reactor thread in order to service
+// outbound calls.
+//
+// Once a Connection is created, it can be used both for sending messages and
+// receiving them, but any given connection is explicitly a client or server.
+// If a pair of servers are making bidirectional RPCs, they will use two separate
+// TCP connections (and Connection objects).
+//
+// This class is not fully thread-safe. It is accessed only from the context of a
+// single ReactorThread except where otherwise specified.
+//
+class Connection : public RefCountedThreadSafe<Connection> {
+ public:
+ enum Direction {
+ // This host is sending calls via this connection.
+ CLIENT,
+ // This host is receiving calls via this connection.
+ SERVER
+ };
+
+ // Create a new Connection.
+ // reactor_thread: the reactor that owns us.
+ // remote: the address of the remote end
+ // socket: the socket to take ownership of.
+ // direction: whether we are the client or server side
+ Connection(ReactorThread *reactor_thread,
+ Sockaddr remote,
+ std::unique_ptr<Socket> socket,
+ Direction direction,
+ CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS);
+
+ // Set underlying socket to non-blocking (or blocking) mode.
+ Status SetNonBlocking(bool enabled);
+
+ // Register our socket with an epoll loop. We will only ever be registered in
+ // one epoll loop at a time.
+ void EpollRegister(ev::loop_ref& loop);
+
+ ~Connection();
+
+ MonoTime last_activity_time() const {
+ return last_activity_time_;
+ }
+
+ // Returns true if we are not in the process of receiving or sending a
+ // message, and we have no outstanding calls.
+ bool Idle() const;
+
+ // Fail any calls which are currently queued or awaiting response.
+ // Prohibits any future calls (they will be failed immediately with this
+ // same Status).
+ void Shutdown(const Status& status,
+ std::unique_ptr<ErrorStatusPB> rpc_error = {});
+
+ // Queue a new call to be made. If the queueing fails, the call will be
+ // marked failed. The caller is expected to check if 'call' has been cancelled
+ // before making the call.
+ // Takes ownership of the 'call' object regardless of whether it succeeds or fails.
+ void QueueOutboundCall(std::shared_ptr<OutboundCall> call);
+
+ // Queue a call response back to the client on the server side.
+ //
+ // This may be called from a non-reactor thread.
+ void QueueResponseForCall(gscoped_ptr<InboundCall> call);
+
+ // Cancel an outbound call by removing any reference to it by CallAwaitingResponse
+ // in 'awaiting_responses_'.
+ void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+ // The address of the remote end of the connection.
+ const Sockaddr &remote() const { return remote_; }
+
+ // Set the user credentials for an outbound connection.
+ void set_outbound_connection_id(ConnectionId conn_id) {
+ DCHECK_EQ(direction_, CLIENT);
+ DCHECK(!outbound_connection_id_);
+ outbound_connection_id_ = std::move(conn_id);
+ }
+
+ // Get the user credentials which will be used to log in.
+ const ConnectionId& outbound_connection_id() const {
+ DCHECK_EQ(direction_, CLIENT);
+ DCHECK(outbound_connection_id_);
+ return *outbound_connection_id_;
+ }
+
+ bool is_confidential() const {
+ return is_confidential_;
+ }
+
+ // Set/unset the 'confidentiality' property for this connection.
+ void set_confidential(bool is_confidential);
+
+ // Credentials policy to start connection negotiation.
+ CredentialsPolicy credentials_policy() const { return credentials_policy_; }
+
+ // Whether the connection satisfies the specified credentials policy.
+ //
+ // NOTE: The policy is set prior to connection negotiation, and the actual
+ // authentication credentials used for connection negotiation might
+ // effectively make the connection to satisfy a stronger policy.
+ // An example: the credentials policy for the connection was set to
+ // ANY_CREDENTIALS, but since the authn token was not available
+ // at the time of negotiation, the primary credentials were used, making
+ // the connection de facto satisfying the PRIMARY_CREDENTIALS policy.
+ bool SatisfiesCredentialsPolicy(CredentialsPolicy policy) const;
+
+ RpczStore* rpcz_store();
+
+ // libev callback when data is available to read.
+ void ReadHandler(ev::io &watcher, int revents);
+
+ // libev callback when we may write to the socket.
+ void WriteHandler(ev::io &watcher, int revents);
+
+ // Safe to be called from other threads.
+ std::string ToString() const;
+
+ Direction direction() const { return direction_; }
+
+ Socket* socket() { return socket_.get(); }
+
+ // Go through the process of transferring control of the underlying socket back to the Reactor.
+ void CompleteNegotiation(Status negotiation_status,
+ std::unique_ptr<ErrorStatusPB> rpc_error);
+
+ // Indicate that negotiation is complete and that the Reactor is now in control of the socket.
+ void MarkNegotiationComplete();
+
+ Status DumpPB(const DumpRunningRpcsRequestPB& req,
+ RpcConnectionPB* resp);
+
+ ReactorThread* reactor_thread() const { return reactor_thread_; }
+
+ std::unique_ptr<Socket> release_socket() {
+ return std::move(socket_);
+ }
+
+ void adopt_socket(std::unique_ptr<Socket> socket) {
+ socket_ = std::move(socket);
+ }
+
+ void set_remote_features(std::set<RpcFeatureFlag> remote_features) {
+ remote_features_ = std::move(remote_features);
+ }
+
+ void set_remote_user(RemoteUser user) {
+ DCHECK_EQ(direction_, SERVER);
+ remote_user_ = std::move(user);
+ }
+
+ const RemoteUser& remote_user() const {
+ DCHECK_EQ(direction_, SERVER);
+ return remote_user_;
+ }
+
+ // Whether the connection is scheduled for shutdown.
+ bool scheduled_for_shutdown() const {
+ DCHECK_EQ(direction_, CLIENT);
+ return scheduled_for_shutdown_;
+ }
+
+ // Mark the connection as scheduled to be shut down. Reactor does not dispatch
+ // new calls on such a connection.
+ void set_scheduled_for_shutdown() {
+ DCHECK_EQ(direction_, CLIENT);
+ scheduled_for_shutdown_ = true;
+ }
+
+ size_t num_queued_outbound_transfers() const {
+ return outbound_transfers_.size();
+ }
+
+ private:
+ friend struct CallAwaitingResponse;
+ friend class QueueTransferTask;
+ friend struct CallTransferCallbacks;
+ friend struct ResponseTransferCallbacks;
+
+ // A call which has been fully sent to the server, which we're waiting for
+ // the server to process. This is used on the client side only.
+ struct CallAwaitingResponse {
+ ~CallAwaitingResponse();
+
+ // Notification from libev that the call has timed out.
+ void HandleTimeout(ev::timer &watcher, int revents);
+
+ Connection *conn;
+ std::shared_ptr<OutboundCall> call;
+ ev::timer timeout_timer;
+
+ // We time out RPC calls in two stages. This is set to the amount of timeout
+ // remaining after the next timeout fires. See Connection::QueueOutboundCall().
+ double remaining_timeout;
+ };
+
+ typedef std::unordered_map<uint64_t, CallAwaitingResponse*> car_map_t;
+ typedef std::unordered_map<uint64_t, InboundCall*> inbound_call_map_t;
+
+ // Returns the next valid (positive) sequential call ID by incrementing a counter
+ // and ensuring we roll over from INT32_MAX to 0.
+ // Negative numbers are reserved for special purposes.
+ int32_t GetNextCallId() {
+ int32_t call_id = next_call_id_;
+ if (PREDICT_FALSE(next_call_id_ == std::numeric_limits<int32_t>::max())) {
+ next_call_id_ = 0;
+ } else {
+ next_call_id_++;
+ }
+ return call_id;
+ }
+
+ // An incoming packet has completed transferring on the server side.
+ // This parses the call and delivers it into the call queue.
+ void HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer);
+
+ // An incoming packet has completed on the client side. This parses the
+ // call response, looks up the CallAwaitingResponse, and calls the
+ // client callback.
+ void HandleCallResponse(gscoped_ptr<InboundTransfer> transfer);
+
+ // The given CallAwaitingResponse has elapsed its user-defined timeout.
+ // Set it to Failed.
+ void HandleOutboundCallTimeout(CallAwaitingResponse *car);
+
+ // Queue a transfer for sending on this connection.
+ // We will take ownership of the transfer.
+ // This must be called from the reactor thread.
+ void QueueOutbound(gscoped_ptr<OutboundTransfer> transfer);
+
+ // Internal test function for injecting cancellation request when 'call'
+ // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
+ void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
+
+ // The reactor thread that created this connection.
+ ReactorThread* const reactor_thread_;
+
+ // The remote address we're talking to.
+ const Sockaddr remote_;
+
+ // The socket we're communicating on.
+ std::unique_ptr<Socket> socket_;
+
+ // The ConnectionId that serves as a key into the client connection map
+ // within this reactor. Only set in the case of outbound connections.
+ boost::optional<ConnectionId> outbound_connection_id_;
+
+ // The authenticated remote user (if this is an inbound connection on the server).
+ RemoteUser remote_user_;
+
+ // whether we are client or server
+ Direction direction_;
+
+ // The last time we read or wrote from the socket.
+ MonoTime last_activity_time_;
+
+ // the inbound transfer, if any
+ gscoped_ptr<InboundTransfer> inbound_;
+
+ // notifies us when our socket is writable.
+ ev::io write_io_;
+
+ // notifies us when our socket is readable.
+ ev::io read_io_;
+
+ // Set to true when the connection is registered on a loop.
+ // This is used for a sanity check in the destructor that we are properly
+ // un-registered before shutting down.
+ bool is_epoll_registered_;
+
+ // waiting to be sent
+ boost::intrusive::list<OutboundTransfer> outbound_transfers_; // NOLINT(*)
+
+ // Calls which have been sent and are now waiting for a response.
+ car_map_t awaiting_response_;
+
+ // Calls which have been received on the server and are currently
+ // being handled.
+ inbound_call_map_t calls_being_handled_;
+
+ // the next call ID to use
+ int32_t next_call_id_;
+
+ // Starts as Status::OK, gets set to a shutdown status upon Shutdown().
+ Status shutdown_status_;
+
+ // RPC features supported by the remote end of the connection.
+ std::set<RpcFeatureFlag> remote_features_;
+
+ // Pool from which CallAwaitingResponse objects are allocated.
+ // Also a funny name.
+ ObjectPool<CallAwaitingResponse> car_pool_;
+ typedef ObjectPool<CallAwaitingResponse>::scoped_ptr scoped_car;
+
+ // The credentials policy to use for connection negotiation. It defines which
+ // type of user credentials used to negotiate a connection. The actual type of
+ // credentials used for authentication during the negotiation process depends
+ // on the credentials availability, but the result credentials guaranteed to
+ // always satisfy the specified credentials policy. In other words, the actual
+ // type of credentials used for connection negotiation might effectively make
+ // the connection to satisfy a stronger/narrower policy.
+ //
+ // An example:
+ // The credentials policy for the connection was set to ANY_CREDENTIALS,
+ // but since no secondary credentials (such authn token) were available
+ // at the time of negotiation, the primary credentials were used,making the
+ // connection satisfying the PRIMARY_CREDENTIALS policy de facto.
+ const CredentialsPolicy credentials_policy_;
+
+ // Whether we completed connection negotiation.
+ bool negotiation_complete_;
+
+ // Whether it's OK to pass confidential information over the connection.
+ // For example, an encrypted (but not necessarily authenticated) connection
+ // is considered confidential.
+ bool is_confidential_;
+
+ // Whether the connection is scheduled for shutdown.
+ bool scheduled_for_shutdown_;
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection_id.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.cc b/be/src/kudu/rpc/connection_id.cc
new file mode 100644
index 0000000..6720807
--- /dev/null
+++ b/be/src/kudu/rpc/connection_id.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/connection_id.h"
+
+#include <cstddef>
+#include <utility>
+
+#include <boost/functional/hash/hash.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+ConnectionId::ConnectionId() {}
+
+ConnectionId::ConnectionId(const Sockaddr& remote,
+ std::string hostname,
+ UserCredentials user_credentials)
+ : remote_(remote),
+ hostname_(std::move(hostname)),
+ user_credentials_(std::move(user_credentials)) {
+ CHECK(!hostname_.empty());
+}
+
+void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
+ DCHECK(user_credentials.has_real_user());
+ user_credentials_ = std::move(user_credentials);
+}
+
+string ConnectionId::ToString() const {
+ string remote;
+ if (hostname_ != remote_.host()) {
+ remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_);
+ } else {
+ remote = remote_.ToString();
+ }
+
+ return strings::Substitute("{remote=$0, user_credentials=$1}",
+ remote,
+ user_credentials_.ToString());
+}
+
+size_t ConnectionId::HashCode() const {
+ size_t seed = 0;
+ boost::hash_combine(seed, remote_.HashCode());
+ boost::hash_combine(seed, hostname_);
+ boost::hash_combine(seed, user_credentials_.HashCode());
+ return seed;
+}
+
+bool ConnectionId::Equals(const ConnectionId& other) const {
+ return remote() == other.remote() &&
+ hostname_ == other.hostname_ &&
+ user_credentials().Equals(other.user_credentials());
+}
+
+size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
+ return conn_id.HashCode();
+}
+
+bool ConnectionIdEqual::operator() (const ConnectionId& cid1, const ConnectionId& cid2) const {
+ return cid1.Equals(cid2);
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection_id.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.h b/be/src/kudu/rpc/connection_id.h
new file mode 100644
index 0000000..67a4786
--- /dev/null
+++ b/be/src/kudu/rpc/connection_id.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <string>
+
+#include "kudu/rpc/user_credentials.h"
+#include "kudu/util/net/sockaddr.h"
+
+namespace kudu {
+namespace rpc {
+
+// Used to key on Connection information.
+// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
+// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
+class ConnectionId {
+ public:
+ ConnectionId();
+
+ // Copy constructor required for use with STL unordered_map.
+ ConnectionId(const ConnectionId& other) = default;
+
+ // Convenience constructor.
+ ConnectionId(const Sockaddr& remote,
+ std::string hostname,
+ UserCredentials user_credentials);
+
+ // The remote address.
+ const Sockaddr& remote() const { return remote_; }
+
+ const std::string& hostname() const { return hostname_; }
+
+ // The credentials of the user associated with this connection, if any.
+ void set_user_credentials(UserCredentials user_credentials);
+
+ const UserCredentials& user_credentials() const { return user_credentials_; }
+
+ // Copy state from another object to this one.
+ void CopyFrom(const ConnectionId& other);
+
+ // Returns a string representation of the object, not including the password field.
+ std::string ToString() const;
+
+ size_t HashCode() const;
+ bool Equals(const ConnectionId& other) const;
+
+ private:
+ // Remember to update HashCode() and Equals() when new fields are added.
+ Sockaddr remote_;
+
+ // The original host name before it was resolved to 'remote_'.
+ // This must be retained since it is used to compute Kerberos Service Principal Names (SPNs).
+ std::string hostname_;
+
+ UserCredentials user_credentials_;
+};
+
+class ConnectionIdHash {
+ public:
+ std::size_t operator() (const ConnectionId& conn_id) const;
+};
+
+class ConnectionIdEqual {
+ public:
+ bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const;
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/constants.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/constants.cc b/be/src/kudu/rpc/constants.cc
new file mode 100644
index 0000000..a4e024c
--- /dev/null
+++ b/be/src/kudu/rpc/constants.cc
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/constants.h"
+
+using std::set;
+
+namespace kudu {
+namespace rpc {
+
+const char* const kMagicNumber = "hrpc";
+const char* const kSaslAppName = "kudu";
+
+// NOTE: the TLS flag is dynamically added based on the local encryption
+// configuration.
+//
+// NOTE: the TLS_AUTHENTICATION_ONLY flag is dynamically added on both
+// sides based on the remote peer's address.
+set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
+set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/constants.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/constants.h b/be/src/kudu/rpc/constants.h
new file mode 100644
index 0000000..a3c7c67
--- /dev/null
+++ b/be/src/kudu/rpc/constants.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_RPC_CONSTANTS_H
+#define KUDU_RPC_RPC_CONSTANTS_H
+
+#include <cstdint>
+#include <set>
+
+#include "kudu/rpc/rpc_header.pb.h"
+
+namespace kudu {
+namespace rpc {
+
+// Magic number bytes sent at connection setup time.
+extern const char* const kMagicNumber;
+
+// App name for SASL library init
+extern const char* const kSaslAppName;
+
+// Current version of the RPC protocol.
+static const uint32_t kCurrentRpcVersion = 9;
+
+// From Hadoop.
+static const int32_t kInvalidCallId = -2;
+static const int32_t kConnectionContextCallId = -3;
+static const int32_t kNegotiateCallId = -33;
+
+static const uint8_t kMagicNumberLength = 4;
+static const uint8_t kHeaderFlagsLength = 3;
+
+// There is a 4-byte length prefix before any packet.
+static const uint8_t kMsgLengthPrefixLength = 4;
+
+// The set of RPC features that this server build supports.
+// Non-const for testing.
+extern std::set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags;
+
+// The set of RPC features that this client build supports.
+// Non-const for testing.
+extern std::set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags;
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_RPC_CONSTANTS_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/exactly_once_rpc-test.cc b/be/src/kudu/rpc/exactly_once_rpc-test.cc
new file mode 100644
index 0000000..c94e89c
--- /dev/null
+++ b/be/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -0,0 +1,629 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <unistd.h>
+
+#include <atomic>
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/retriable_rpc.h"
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rtest.pb.h"
+#include "kudu/rpc/rtest.proxy.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+DECLARE_int64(remember_clients_ttl_ms);
+DECLARE_int64(remember_responses_ttl_ms);
+DECLARE_int64(result_tracker_gc_interval_ms);
+
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using std::atomic_int;
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+class Messenger;
+
+namespace {
+
+const char* kClientId = "test-client";
+
+void AddRequestId(RpcController* controller,
+ const std::string& client_id,
+ ResultTracker::SequenceNumber sequence_number,
+ int64_t attempt_no) {
+ unique_ptr<RequestIdPB> request_id(new RequestIdPB());
+ request_id->set_client_id(client_id);
+ request_id->set_seq_no(sequence_number);
+ request_id->set_attempt_no(attempt_no);
+ request_id->set_first_incomplete_seq_no(sequence_number);
+ controller->SetRequestIdPB(std::move(request_id));
+}
+
+class TestServerPicker : public ServerPicker<CalculatorServiceProxy> {
+ public:
+ explicit TestServerPicker(CalculatorServiceProxy* proxy) : proxy_(proxy) {}
+
+ void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override {
+ callback.Run(Status::OK(), proxy_);
+ }
+
+ void MarkServerFailed(CalculatorServiceProxy*, const Status&) override {}
+ void MarkReplicaNotLeader(CalculatorServiceProxy*) override {}
+ void MarkResourceNotFound(CalculatorServiceProxy*) override {}
+
+ private:
+ CalculatorServiceProxy* proxy_;
+};
+
+} // anonymous namespace
+
+class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
+ ExactlyOnceRequestPB,
+ ExactlyOnceResponsePB> {
+ public:
+ CalculatorServiceRpc(const scoped_refptr<TestServerPicker>& server_picker,
+ const scoped_refptr<RequestTracker>& request_tracker,
+ const MonoTime& deadline,
+ shared_ptr<Messenger> messenger,
+ int value,
+ CountDownLatch* latch,
+ int server_sleep = 0)
+ : RetriableRpc(server_picker, request_tracker, deadline, std::move(messenger)),
+ latch_(latch) {
+ req_.set_value_to_add(value);
+ req_.set_randomly_fail(true);
+ req_.set_sleep_for_ms(server_sleep);
+ }
+
+ void Try(CalculatorServiceProxy* server, const ResponseCallback& callback) override {
+ server->AddExactlyOnceAsync(req_,
+ &resp_,
+ mutable_retrier()->mutable_controller(),
+ callback);
+ }
+
+ RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override {
+ // We shouldn't get errors from the server/rpc system since we set a high timeout.
+ CHECK_OK(rpc_cb_status);
+
+ if (!mutable_retrier()->controller().status().ok()) {
+ CHECK(mutable_retrier()->controller().status().IsRemoteError());
+ if (mutable_retrier()->controller().error_response()->code()
+ == ErrorStatusPB::ERROR_REQUEST_STALE) {
+ return { RetriableRpcStatus::NON_RETRIABLE_ERROR,
+ mutable_retrier()->controller().status() };
+ }
+ return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+ mutable_retrier()->controller().status() };
+ }
+
+ // If the controller is not finished we're in the ReplicaFoundCb() callback.
+ // Return ok to proceed with the call to the server.
+ if (!mutable_retrier()->mutable_controller()->finished()) {
+ return { RetriableRpcStatus::OK, Status::OK() };
+ }
+
+ // If we've received a response in the past, all following responses must
+ // match.
+ if (!successful_response_.IsInitialized()) {
+ successful_response_.CopyFrom(resp_);
+ } else {
+ CHECK_EQ(SecureDebugString(successful_response_),
+ SecureDebugString(resp_));
+ }
+
+ if (sometimes_retry_successful_) {
+ // Still report errors, with some probability. This will cause requests to
+ // be retried. Since the requests were originally successful we should get
+ // the same reply back.
+ int random = rand() % 4;
+ switch (random) {
+ case 0:
+ return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+ Status::RemoteError("") };
+ case 1:
+ return { RetriableRpcStatus::RESOURCE_NOT_FOUND,
+ Status::RemoteError("") };
+ case 2:
+ return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE,
+ Status::RemoteError("") };
+ case 3:
+ return { RetriableRpcStatus::OK, Status::OK() };
+ default: LOG(FATAL) << "Unexpected value";
+ }
+ }
+ return { RetriableRpcStatus::OK, Status::OK() };
+ }
+
+ void Finish(const Status& status) override {
+ CHECK_OK(status);
+ latch_->CountDown();
+ delete this;
+ }
+
+ std::string ToString() const override { return "test-rpc"; }
+ CountDownLatch* latch_;
+ ExactlyOnceResponsePB successful_response_;
+ bool sometimes_retry_successful_ = true;
+};
+
+class ExactlyOnceRpcTest : public RpcTestBase {
+ public:
+ void SetUp() override {
+ RpcTestBase::SetUp();
+ SeedRandom();
+ }
+
+ Status StartServer() {
+ // Set up server.
+ RETURN_NOT_OK(StartTestServerWithGeneratedCode(&server_addr_));
+ RETURN_NOT_OK(CreateMessenger("Client", &client_messenger_));
+ proxy_.reset(new CalculatorServiceProxy(
+ client_messenger_, server_addr_, server_addr_.host()));
+ test_picker_.reset(new TestServerPicker(proxy_.get()));
+ request_tracker_.reset(new RequestTracker(kClientId));
+ attempt_nos_ = 0;
+
+ return Status::OK();
+ }
+
+ // An exactly once adder that uses RetriableRpc to perform the requests.
+ struct RetriableRpcExactlyOnceAdder {
+ RetriableRpcExactlyOnceAdder(const scoped_refptr<TestServerPicker>& server_picker,
+ const scoped_refptr<RequestTracker>& request_tracker,
+ shared_ptr<Messenger> messenger,
+ int value,
+ int server_sleep = 0) : latch_(1) {
+ MonoTime now = MonoTime::Now();
+ now.AddDelta(MonoDelta::FromMilliseconds(10000));
+ rpc_ = new CalculatorServiceRpc(server_picker,
+ request_tracker,
+ now,
+ std::move(messenger),
+ value,
+ &latch_,
+ server_sleep);
+ }
+
+ void Start() {
+ CHECK_OK(kudu::Thread::Create(
+ "test",
+ "test",
+ &RetriableRpcExactlyOnceAdder::SleepAndSend, this, &thread));
+ }
+
+ void SleepAndSend() {
+ rpc_->SendRpc();
+ latch_.Wait();
+ }
+
+ CountDownLatch latch_;
+ scoped_refptr<kudu::Thread> thread;
+ CalculatorServiceRpc* rpc_;
+ };
+
+ // An exactly once adder that sends multiple, simultaneous calls, to the server
+ // and makes sure that only one of the calls was successful.
+ struct SimultaneousExactlyOnceAdder {
+ SimultaneousExactlyOnceAdder(CalculatorServiceProxy* p,
+ ResultTracker::SequenceNumber sequence_number,
+ int value,
+ uint64_t client_sleep,
+ uint64_t server_sleep,
+ int64_t attempt_no)
+ : proxy(p),
+ client_sleep_for_ms(client_sleep) {
+ req.set_value_to_add(value);
+ req.set_sleep_for_ms(server_sleep);
+ AddRequestId(&controller, kClientId, sequence_number, attempt_no);
+ }
+
+ void Start() {
+ CHECK_OK(kudu::Thread::Create(
+ "test",
+ "test",
+ &SimultaneousExactlyOnceAdder::SleepAndSend, this, &thread));
+ }
+
+ // Sleeps the preset number of msecs before sending the call.
+ void SleepAndSend() {
+ usleep(client_sleep_for_ms * 1000);
+ controller.set_timeout(MonoDelta::FromSeconds(20));
+ CHECK_OK(proxy->AddExactlyOnce(req, &resp, &controller));
+ }
+
+ CalculatorServiceProxy* const proxy;
+ const uint64_t client_sleep_for_ms;
+ RpcController controller;
+ ExactlyOnceRequestPB req;
+ ExactlyOnceResponsePB resp;
+ scoped_refptr<kudu::Thread> thread;
+ };
+
+
+ void CheckValueMatches(int expected_value) {
+ RpcController controller;
+ ExactlyOnceRequestPB req;
+ req.set_value_to_add(0);
+ ExactlyOnceResponsePB resp;
+ RequestTracker::SequenceNumber seq_no;
+ CHECK_OK(request_tracker_->NewSeqNo(&seq_no));
+ AddRequestId(&controller, kClientId, seq_no, 0);
+ ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+ ASSERT_EQ(resp.current_val(), expected_value);
+ request_tracker_->RpcCompleted(seq_no);
+ }
+
+
+ // This continuously issues calls to the server, that often last longer than
+ // 'remember_responses_ttl_ms', making sure that we don't get errors back.
+ void DoLongWritesThread(MonoDelta run_for) {
+ MonoTime run_until = MonoTime::Now();
+ run_until.AddDelta(run_for);
+ int counter = 0;
+ while (MonoTime::Now() < run_until) {
+ unique_ptr<RetriableRpcExactlyOnceAdder> adder(new RetriableRpcExactlyOnceAdder(
+ test_picker_, request_tracker_, client_messenger_, 1,
+ rand() % (2 * FLAGS_remember_responses_ttl_ms)));
+
+ // This thread is used in the stress test where we're constantly running GC.
+ // So, once we get a "success" response, it's likely that the result will be
+ // GCed on the server side, and thus it's not safe to spuriously retry.
+ adder->rpc_->sometimes_retry_successful_ = false;
+ adder->SleepAndSend();
+ SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+ counter++;
+ }
+ ExactlyOnceResponsePB response;
+ ResultTracker::SequenceNumber sequence_number;
+ CHECK_OK(request_tracker_->NewSeqNo(&sequence_number));
+ CHECK_OK(MakeAddCall(sequence_number, 0, &response));
+ CHECK_EQ(response.current_val(), counter);
+ request_tracker_->RpcCompleted(sequence_number);
+ }
+
+ // Stubbornly sends the same request to the server, this should observe three states.
+ // The request should be successful at first, then its result should be GCed and the
+ // client should be GCed.
+ void StubbornlyWriteTheSameRequestThread(ResultTracker::SequenceNumber sequence_number,
+ MonoDelta run_for) {
+ MonoTime run_until = MonoTime::Now();
+ run_until.AddDelta(run_for);
+ // Make an initial request, so that we get a response to compare to.
+ ExactlyOnceResponsePB original_response;
+ CHECK_OK(MakeAddCall(sequence_number, 0, &original_response));
+
+ // Now repeat the same request. At first we should get the same response, then the result
+ // should be GCed and we should get STALE back. Finally the request should succeed again
+ // but we should get a new response.
+ bool result_gced = false;
+ bool client_gced = false;
+ while (MonoTime::Now() < run_until) {
+ ExactlyOnceResponsePB response;
+ Status s = MakeAddCall(sequence_number, 0, &response);
+ if (s.ok()) {
+ if (!result_gced) {
+ CHECK_EQ(SecureDebugString(response), SecureDebugString(original_response));
+ } else {
+ client_gced = true;
+ CHECK_NE(SecureDebugString(response), SecureDebugString(original_response));
+ }
+ SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+ } else if (s.IsRemoteError()) {
+ result_gced = true;
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms * 2));
+ }
+ }
+ CHECK(result_gced);
+ CHECK(client_gced);
+ }
+
+ Status MakeAddCall(ResultTracker::SequenceNumber sequence_number,
+ int value_to_add,
+ ExactlyOnceResponsePB* response,
+ int attempt_no = -1) {
+ RpcController controller;
+ ExactlyOnceRequestPB req;
+ req.set_value_to_add(value_to_add);
+ if (attempt_no == -1) attempt_no = attempt_nos_.fetch_add(1);
+ AddRequestId(&controller, kClientId, sequence_number, attempt_no);
+ Status s = proxy_->AddExactlyOnce(req, response, &controller);
+ return s;
+ }
+
+ protected:
+ Sockaddr server_addr_;
+ atomic_int attempt_nos_;
+ shared_ptr<Messenger> client_messenger_;
+ std::unique_ptr<CalculatorServiceProxy> proxy_;
+ scoped_refptr<TestServerPicker> test_picker_;
+ scoped_refptr<RequestTracker> request_tracker_;
+};
+
+// Tests that we get exactly once semantics on RPCs when we send a bunch of requests with the
+// same sequence number as previous requests.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
+ ASSERT_OK(StartServer());
+ ExactlyOnceResponsePB original_resp;
+ int mem_consumption = mem_tracker_->consumption();
+ {
+ RpcController controller;
+ ExactlyOnceRequestPB req;
+ req.set_value_to_add(1);
+
+ // Assign id 0.
+ AddRequestId(&controller, kClientId, 0, 0);
+
+ // Send the request the first time.
+ ASSERT_OK(proxy_->AddExactlyOnce(req, &original_resp, &controller));
+
+ // The incremental usage of a new client is the size of the response itself
+ // plus some fixed overhead for the client-tracking structure.
+ int expected_incremental_usage = original_resp.SpaceUsed() + 200;
+
+ int mem_consumption_after = mem_tracker_->consumption();
+ ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
+ mem_consumption = mem_consumption_after;
+ }
+
+ // Now repeat the rpc 10 times, using the same sequence number, none of these should be executed
+ // and they should get the same response back.
+ for (int i = 0; i < 10; i++) {
+ RpcController controller;
+ controller.set_timeout(MonoDelta::FromSeconds(20));
+ ExactlyOnceRequestPB req;
+ req.set_value_to_add(1);
+ ExactlyOnceResponsePB resp;
+ AddRequestId(&controller, kClientId, 0, i + 1);
+ ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+ ASSERT_EQ(resp.current_val(), 1);
+ ASSERT_EQ(resp.current_time_micros(), original_resp.current_time_micros());
+ // Sleep to give the MemTracker time to update -- we don't expect any update,
+ // but if we had a bug here, we'd only see it with this sleep.
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ // We shouldn't have consumed any more memory since the responses were cached.
+ ASSERT_EQ(mem_consumption, mem_tracker_->consumption());
+ }
+
+ // Making a new request, from a new client, should double the memory consumption.
+ {
+ RpcController controller;
+ ExactlyOnceRequestPB req;
+ ExactlyOnceResponsePB resp;
+ req.set_value_to_add(1);
+
+ // Assign id 0.
+ AddRequestId(&controller, "test-client2", 0, 0);
+
+ // Send the first request for this new client.
+ ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+ ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
+ }
+}
+
+// Performs a series of requests in which each single request is attempted multiple times, as
+// the server side is instructed to spuriously fail attempts.
+// In CalculatorServiceRpc we sure that the same response is returned by all retries and,
+// after all the rpcs are done, we make sure that final result is the expected one.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
+ ASSERT_OK(StartServer());
+ int kNumIterations = 10;
+ int kNumRpcs = 10;
+
+ if (AllowSlowTests()) {
+ kNumIterations = 100;
+ kNumRpcs = 100;
+ }
+
+ int count = 0;
+ for (int i = 0; i < kNumIterations; i ++) {
+ vector<unique_ptr<RetriableRpcExactlyOnceAdder>> adders;
+ for (int j = 0; j < kNumRpcs; j++) {
+ unique_ptr<RetriableRpcExactlyOnceAdder> adder(
+ new RetriableRpcExactlyOnceAdder(test_picker_, request_tracker_, client_messenger_, j));
+ adders.push_back(std::move(adder));
+ adders[j]->Start();
+ count += j;
+ }
+ for (int j = 0; j < kNumRpcs; j++) {
+ CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
+ }
+ CheckValueMatches(count);
+ }
+}
+
+// Performs a series of requests in which each single request is attempted by multiple threads.
+// On each iteration, after all the threads complete, we expect that the add operation was
+// executed exactly once.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
+ ASSERT_OK(StartServer());
+ int kNumIterations = 10;
+ int kNumThreads = 10;
+
+ if (AllowSlowTests()) {
+ kNumIterations = 100;
+ kNumThreads = 100;
+ }
+
+ ResultTracker::SequenceNumber sequence_number = 0;
+ int memory_consumption_initial = mem_tracker_->consumption();
+ int single_response_size = 0;
+
+ // Measure memory consumption for a single response from the same client.
+ ExactlyOnceResponsePB resp;
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+
+ for (int i = 1; i <= kNumIterations; i ++) {
+ vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
+ for (int j = 0; j < kNumThreads; j++) {
+ unique_ptr<SimultaneousExactlyOnceAdder> adder(
+ new SimultaneousExactlyOnceAdder(proxy_.get(),
+ i, // sequence number
+ 1, // value
+ rand() % 20, // client_sleep
+ rand() % 10, // server_sleep
+ attempt_nos_.fetch_add(1))); // attempt number
+ adders.push_back(std::move(adder));
+ adders[j]->Start();
+ }
+ uint64_t time_micros = 0;
+ for (int j = 0; j < kNumThreads; j++) {
+ CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
+ ASSERT_EQ(adders[j]->resp.current_val(), i + 1);
+ if (time_micros == 0) {
+ time_micros = adders[j]->resp.current_time_micros();
+ } else {
+ ASSERT_EQ(adders[j]->resp.current_time_micros(), time_micros);
+ }
+ }
+
+ // After all adders finished we should at least the size of one more response.
+ // The actual size depends of multiple factors, for instance, how many calls were "attached"
+ // (which is timing dependent) so we can't be more precise than this.
+ ASSERT_GT(mem_tracker_->consumption(),
+ memory_consumption_initial + single_response_size * i);
+ }
+}
+
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) {
+ FLAGS_remember_clients_ttl_ms = 500;
+ FLAGS_remember_responses_ttl_ms = 100;
+
+ ASSERT_OK(StartServer());
+
+ // Make a request.
+ ExactlyOnceResponsePB original;
+ ResultTracker::SequenceNumber sequence_number = 0;
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &original));
+
+ // Making the same request again, should return the same response.
+ ExactlyOnceResponsePB resp;
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+ ASSERT_EQ(SecureShortDebugString(original), SecureShortDebugString(resp));
+
+ // Now sleep for 'remember_responses_ttl_ms' and run GC, we should then
+ // get a STALE back.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_responses_ttl_ms));
+ int64_t memory_consumption = mem_tracker_->consumption();
+ result_tracker_->GCResults();
+ ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+ resp.Clear();
+ Status s = MakeAddCall(sequence_number, 1, &resp);
+ ASSERT_TRUE(s.IsRemoteError());
+ ASSERT_STR_CONTAINS(s.ToString(), "is stale");
+
+ // Sleep again, this time for 'remember_clients_ttl_ms' and run GC again.
+ // The request should be successful, but its response should be a new one.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms));
+ memory_consumption = mem_tracker_->consumption();
+ result_tracker_->GCResults();
+ ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+ resp.Clear();
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+ ASSERT_NE(SecureShortDebugString(resp), SecureShortDebugString(original));
+}
+
+// This test creates a thread continuously making requests to the server, some lasting longer
+// than the GC period, at the same time it runs GC, making sure that the corresponding
+// CompletionRecords/ClientStates are not deleted from underneath the ongoing requests.
+// This also creates a thread that runs GC very frequently and another thread that sends the
+// same request over and over and observes the possible states: request is ok, request is stale
+// request is ok again (because the client was forgotten).
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) {
+ FLAGS_remember_clients_ttl_ms = 100;
+ FLAGS_remember_responses_ttl_ms = 10;
+ FLAGS_result_tracker_gc_interval_ms = 10;
+
+ ASSERT_OK(StartServer());
+
+ // The write thread runs for a shorter period to make sure client GC has a
+ // chance to run.
+ MonoDelta writes_run_for = MonoDelta::FromSeconds(2);
+ MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3);
+ if (AllowSlowTests()) {
+ writes_run_for = MonoDelta::FromSeconds(10);
+ stubborn_run_for = MonoDelta::FromSeconds(11);
+ }
+
+ result_tracker_->StartGCThread();
+
+ // Assign the first sequence number (0) to the 'stubborn writes' thread.
+ // This thread will keep making RPCs with this sequence number while
+ // the 'write_thread' will make normal requests with increasing sequence
+ // numbers.
+ ResultTracker::SequenceNumber stubborn_req_seq_num;
+ CHECK_OK(request_tracker_->NewSeqNo(&stubborn_req_seq_num));
+ ASSERT_EQ(stubborn_req_seq_num, 0);
+
+ scoped_refptr<kudu::Thread> stubborn_thread;
+ CHECK_OK(kudu::Thread::Create(
+ "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
+ this, stubborn_req_seq_num, stubborn_run_for, &stubborn_thread));
+
+ scoped_refptr<kudu::Thread> write_thread;
+ CHECK_OK(kudu::Thread::Create(
+ "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread,
+ this, writes_run_for, &write_thread));
+
+ write_thread->Join();
+ stubborn_thread->Join();
+
+ // Within a few seconds, the consumption should be back to zero.
+ // Really, this should be within 100ms, but we'll give it a bit of
+ // time to avoid test flakiness.
+ AssertEventually([&]() {
+ ASSERT_EQ(0, mem_tracker_->consumption());
+ }, MonoDelta::FromSeconds(5));
+ NO_PENDING_FATALS();
+}
+
+
+} // namespace rpc
+} // namespace kudu