You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:26 UTC
[10/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC
library from kudu@314c9d8
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_method.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.h b/be/src/kudu/rpc/remote_method.h
new file mode 100644
index 0000000..5b78dad
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REMOTE_METHOD_H_
+#define KUDU_RPC_REMOTE_METHOD_H_
+
+#include <string>
+
+namespace kudu {
+namespace rpc {
+
+class RemoteMethodPB;
+
+// Simple class that acts as a container for a fully qualified remote RPC name
+// and converts to/from RemoteMethodPB.
+// This class is also copyable and assignable for convenience reasons.
+class RemoteMethod {
+ public:
+ RemoteMethod() {}
+ RemoteMethod(std::string service_name, const std::string method_name);
+ std::string service_name() const { return service_name_; }
+ std::string method_name() const { return method_name_; }
+
+ // Encode/decode to/from 'pb'.
+ void FromPB(const RemoteMethodPB& pb);
+ void ToPB(RemoteMethodPB* pb) const;
+
+ std::string ToString() const;
+
+ private:
+ std::string service_name_;
+ std::string method_name_;
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_REMOTE_METHOD_H_
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_user.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.cc b/be/src/kudu/rpc/remote_user.cc
new file mode 100644
index 0000000..50e3fcd
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.cc
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/remote_user.h"
+
+#include <boost/optional.hpp>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+string RemoteUser::ToString() const {
+ string ret;
+ strings::SubstituteAndAppend(&ret, "{username='$0'", username_);
+ if (principal_) {
+ strings::SubstituteAndAppend(&ret, ", principal='$0'", *principal_);
+ }
+ ret.append("}");
+ return ret;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_user.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.h b/be/src/kudu/rpc/remote_user.h
new file mode 100644
index 0000000..7dc0590
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <boost/optional.hpp>
+
+namespace kudu {
+namespace rpc {
+
+// Server-side view of the remote authenticated user.
+//
+// This class may be read by multiple threads concurrently after
+// its initialization during RPC negotiation.
+class RemoteUser {
+ public:
+ // The method by which the remote user authenticated.
+ enum Method {
+ // No authentication (authentication was not required by the server
+ // and the user provided a username but it was not validated in any way)
+ UNAUTHENTICATED,
+ // Kerberos-authenticated.
+ KERBEROS,
+ // Authenticated by a Kudu authentication token.
+ AUTHN_TOKEN,
+ // Authenticated by a client certificate.
+ CLIENT_CERT
+ };
+
+ Method authenticated_by() const {
+ return authenticated_by_;
+ }
+
+ const std::string& username() const { return username_; }
+
+ boost::optional<std::string> principal() const {
+ return principal_;
+ }
+
+ void SetAuthenticatedByKerberos(std::string username,
+ std::string principal) {
+ authenticated_by_ = KERBEROS;
+ username_ = std::move(username);
+ principal_ = std::move(principal);
+ }
+
+ void SetUnauthenticated(std::string username) {
+ authenticated_by_ = UNAUTHENTICATED;
+ username_ = std::move(username);
+ principal_ = boost::none;
+ }
+
+ void SetAuthenticatedByClientCert(std::string username,
+ boost::optional<std::string> principal) {
+ authenticated_by_ = CLIENT_CERT;
+ username_ = std::move(username);
+ principal_ = std::move(principal);
+ }
+
+ void SetAuthenticatedByToken(std::string username) {
+ authenticated_by_ = AUTHN_TOKEN;
+ username_ = std::move(username);
+ principal_ = boost::none;
+ }
+
+ // Returns a string representation of the object.
+ std::string ToString() const;
+
+ private:
+ // The real username of the remote user. In the case of a Kerberos
+ // principal, this has already been mapped to a local username.
+ // TODO(todd): actually do the above mapping.
+ std::string username_;
+
+ // The full principal of the remote user. This is only set in the
+ // case of a strong-authenticated user.
+ boost::optional<std::string> principal_;
+
+ Method authenticated_by_ = UNAUTHENTICATED;
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker-test.cc b/be/src/kudu/rpc/request_tracker-test.cc
new file mode 100644
index 0000000..89ea8a2
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker-test.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <vector>
+
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/util/test_util.h"
+
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+TEST(RequestTrackerTest, TestSequenceNumberGeneration) {
+ const int MAX = 10;
+
+ scoped_refptr<RequestTracker> tracker_(new RequestTracker("test_client"));
+
+ // A new tracker should have no incomplete RPCs
+ RequestTracker::SequenceNumber seq_no = tracker_->FirstIncomplete();
+ ASSERT_EQ(seq_no, RequestTracker::NO_SEQ_NO);
+
+ vector<RequestTracker::SequenceNumber> generated_seq_nos;
+
+ // Generate MAX in flight RPCs, making sure they are correctly returned.
+ for (int i = 0; i < MAX; i++) {
+ ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+ generated_seq_nos.push_back(seq_no);
+ }
+
+ // Now we should get a first incomplete.
+ ASSERT_EQ(generated_seq_nos[0], tracker_->FirstIncomplete());
+
+ // Marking 'first_incomplete' as done, should advance the first incomplete.
+ tracker_->RpcCompleted(tracker_->FirstIncomplete());
+
+ ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+ // Marking a 'middle' rpc, should not advance 'first_incomplete'.
+ tracker_->RpcCompleted(generated_seq_nos[5]);
+ ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+ // Marking half the rpc as complete should advance FirstIncomplete.
+ // Note that this also tests that RequestTracker::RpcCompleted() is idempotent, i.e. that
+ // marking the same sequence number as complete twice is a no-op.
+ for (int i = 0; i < MAX / 2; i++) {
+ tracker_->RpcCompleted(generated_seq_nos[i]);
+ }
+
+ ASSERT_EQ(generated_seq_nos[6], tracker_->FirstIncomplete());
+
+ for (int i = MAX / 2; i <= MAX; i++) {
+ ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+ generated_seq_nos.push_back(seq_no);
+ }
+
+ // Marking them all as completed should cause RequestTracker::FirstIncomplete() to return
+ // Status::NotFound() again.
+ for (auto seq_no : generated_seq_nos) {
+ tracker_->RpcCompleted(seq_no);
+ }
+
+ ASSERT_EQ(tracker_->FirstIncomplete(), RequestTracker::NO_SEQ_NO);
+}
+
+} // namespace rpc
+} // namespace kudu
+
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.cc b/be/src/kudu/rpc/request_tracker.cc
new file mode 100644
index 0000000..1958664
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/request_tracker.h"
+
+#include <mutex>
+
+#include "kudu/gutil/map-util.h"
+
+namespace kudu {
+namespace rpc {
+
+const RequestTracker::SequenceNumber RequestTracker::NO_SEQ_NO = -1;
+
+RequestTracker::RequestTracker(const string& client_id)
+ : client_id_(client_id),
+ next_(0) {}
+
+Status RequestTracker::NewSeqNo(SequenceNumber* seq_no) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ *seq_no = next_;
+ InsertOrDie(&incomplete_rpcs_, *seq_no);
+ next_++;
+ return Status::OK();
+}
+
+RequestTracker::SequenceNumber RequestTracker::FirstIncomplete() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ if (incomplete_rpcs_.empty()) return NO_SEQ_NO;
+ return *incomplete_rpcs_.begin();
+}
+
+void RequestTracker::RpcCompleted(const SequenceNumber& seq_no) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ incomplete_rpcs_.erase(seq_no);
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.h b/be/src/kudu/rpc/request_tracker.h
new file mode 100644
index 0000000..99f8d6c
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.h
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <set>
+#include <string>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+// RequestTracker implementation, inspired by:
+// "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al.
+//
+// This generates sequence numbers for retriable RPCs and tracks the ongoing ones.
+// The main point of this is to enable exactly-once semantics, i.e. making sure that
+// an RPC is only executed once, by uniquely identifying each RPC that is sent to
+// the server.
+//
+// Note that the sequence numbers here are differet from RPC 'call ids'. A call id
+// uniquely identifies a call _to a server_. All calls have a call id that is
+// assigned incrementally. Sequence numbers, on the other hand, uniquely identify
+// the RPC operation itself. That is, if an RPC is retried on another server it will
+// have a different call id, but the same sequence number.
+//
+// By keeping track of the RPCs that are in-flight and which ones are completed
+// we can determine the first incomplete RPC. When this information is sent
+// to the server it can use it to garbage collect RPC results that it might be
+// saving for future retries, since it now knows there won't be any.
+//
+// This class is thread safe.
+class RequestTracker : public RefCountedThreadSafe<RequestTracker> {
+ public:
+ typedef int64_t SequenceNumber;
+ static const RequestTracker::SequenceNumber NO_SEQ_NO;
+ explicit RequestTracker(const std::string& client_id);
+
+ // Creates a new, unique, sequence number.
+ // Sequence numbers are assigned in increasing integer order.
+ // Returns Status::OK() and sets 'seq_no' if it was able to generate a sequence number
+ // or returns Status::ServiceUnavailable() if too many RPCs are in-flight, in which case
+ // the caller should try again later.
+ Status NewSeqNo(SequenceNumber* seq_no);
+
+ // Returns the sequence number of the first incomplete RPC.
+ // If there is no incomplete RPC returns NO_SEQ_NO.
+ SequenceNumber FirstIncomplete();
+
+ // Marks the rpc with 'seq_no' as completed.
+ void RpcCompleted(const SequenceNumber& seq_no);
+
+ // Returns the client id for this request tracker.
+ const std::string& client_id() { return client_id_; }
+ private:
+ // The client id for this request tracker.
+ const std::string client_id_;
+
+ // Lock that protects all non-const fields.
+ simple_spinlock lock_;
+
+ // The next sequence number.
+ SequenceNumber next_;
+
+ // The (ordered) set of incomplete RPCs.
+ std::set<SequenceNumber> incomplete_rpcs_;
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/response_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/response_callback.h b/be/src/kudu/rpc/response_callback.h
new file mode 100644
index 0000000..8c4fc03
--- /dev/null
+++ b/be/src/kudu/rpc/response_callback.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_RESPONSE_CALLBACK_H
+#define KUDU_RPC_RESPONSE_CALLBACK_H
+
+#include <boost/function.hpp>
+
+namespace kudu {
+namespace rpc {
+
+typedef boost::function<void()> ResponseCallback;
+
+}
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/result_tracker.cc b/be/src/kudu/rpc/result_tracker.cc
new file mode 100644
index 0000000..11ff8d2
--- /dev/null
+++ b/be/src/kudu/rpc/result_tracker.cc
@@ -0,0 +1,582 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/result_tracker.h"
+
+#include <algorithm>
+#include <limits>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/trace.h"
+
+DEFINE_int64(remember_clients_ttl_ms, 3600 * 1000 /* 1 hour */,
+ "Maximum amount of time, in milliseconds, the server \"remembers\" a client for the "
+ "purpose of caching its responses. After this period without hearing from it, the "
+ "client is no longer remembered and the memory occupied by its responses is reclaimed. "
+ "Retries of requests older than 'remember_clients_ttl_ms' are treated as new "
+ "ones.");
+TAG_FLAG(remember_clients_ttl_ms, advanced);
+
+DEFINE_int64(remember_responses_ttl_ms, 600 * 1000 /* 10 mins */,
+ "Maximum amount of time, in milliseconds, the server \"remembers\" a response to a "
+ "specific request for a client. After this period has elapsed, the response may have "
+ "been garbage collected and the client might get a response indicating the request is "
+ "STALE.");
+TAG_FLAG(remember_responses_ttl_ms, advanced);
+
+DEFINE_int64(result_tracker_gc_interval_ms, 1000,
+ "Interval at which the result tracker will look for entries to GC.");
+TAG_FLAG(result_tracker_gc_interval_ms, hidden);
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::Message;
+using kudu::MemTracker;
+using rpc::InboundCall;
+using std::move;
+using std::lock_guard;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+// This tracks the size changes of anything that has a memory_footprint() method.
+// It must be instantiated before the updates, and it makes sure that the MemTracker
+// is updated on scope exit.
+template <class T>
+struct ScopedMemTrackerUpdater {
+ ScopedMemTrackerUpdater(MemTracker* tracker_, const T* tracked_)
+ : tracker(tracker_),
+ tracked(tracked_),
+ memory_before(tracked->memory_footprint()),
+ cancelled(false) {
+ }
+
+ ~ScopedMemTrackerUpdater() {
+ if (cancelled) return;
+ tracker->Release(memory_before - tracked->memory_footprint());
+ }
+
+ void Cancel() {
+ cancelled = true;
+ }
+
+ MemTracker* tracker;
+ const T* tracked;
+ int64_t memory_before;
+ bool cancelled;
+};
+
+ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker)
+ : mem_tracker_(std::move(mem_tracker)),
+ clients_(ClientStateMap::key_compare(),
+ ClientStateMapAllocator(mem_tracker_)),
+ gc_thread_stop_latch_(1) {}
+
+ResultTracker::~ResultTracker() {
+ if (gc_thread_) {
+ gc_thread_stop_latch_.CountDown();
+ gc_thread_->Join();
+ }
+
+ lock_guard<simple_spinlock> l(lock_);
+ // Release all the memory for the stuff we'll delete on destruction.
+ for (auto& client_state : clients_) {
+ client_state.second->GCCompletionRecords(
+ mem_tracker_, [] (SequenceNumber, CompletionRecord*){ return true; });
+ mem_tracker_->Release(client_state.second->memory_footprint());
+ }
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
+ Message* response,
+ RpcContext* context) {
+ lock_guard<simple_spinlock> l(lock_);
+ return TrackRpcUnlocked(request_id, response, context);
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& request_id,
+ Message* response,
+ RpcContext* context) {
+ ClientState* client_state = ComputeIfAbsent(
+ &clients_,
+ request_id.client_id(),
+ [&]{
+ unique_ptr<ClientState> client_state(new ClientState(mem_tracker_));
+ mem_tracker_->Consume(client_state->memory_footprint());
+ client_state->stale_before_seq_no = request_id.first_incomplete_seq_no();
+ return client_state;
+ })->get();
+
+ client_state->last_heard_from = MonoTime::Now();
+
+ // If the arriving request is older than our per-client GC watermark, report its
+ // staleness to the client.
+ if (PREDICT_FALSE(request_id.seq_no() < client_state->stale_before_seq_no)) {
+ if (context) {
+ context->call_->RespondFailure(
+ ErrorStatusPB::ERROR_REQUEST_STALE,
+ Status::Incomplete(Substitute("Request with id { $0 } is stale.",
+ SecureShortDebugString(request_id))));
+ delete context;
+ }
+ return RpcState::STALE;
+ }
+
+ // GC records according to the client's first incomplete watermark.
+ client_state->GCCompletionRecords(
+ mem_tracker_,
+ [&] (SequenceNumber seq_no, CompletionRecord* completion_record) {
+ return completion_record->state != RpcState::IN_PROGRESS &&
+ seq_no < request_id.first_incomplete_seq_no();
+ });
+
+ auto result = ComputeIfAbsentReturnAbsense(
+ &client_state->completion_records,
+ request_id.seq_no(),
+ [&]{
+ unique_ptr<CompletionRecord> completion_record(new CompletionRecord(
+ RpcState::IN_PROGRESS, request_id.attempt_no()));
+ mem_tracker_->Consume(completion_record->memory_footprint());
+ return completion_record;
+ });
+
+ CompletionRecord* completion_record = result.first->get();
+ ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
+
+ if (PREDICT_TRUE(result.second)) {
+ // When a follower is applying an operation it doesn't have a response yet, and it won't
+ // have a context, so only set them if they exist.
+ if (response != nullptr) {
+ completion_record->ongoing_rpcs.push_back({response,
+ DCHECK_NOTNULL(context),
+ request_id.attempt_no()});
+ }
+ return RpcState::NEW;
+ }
+
+ completion_record->last_updated = MonoTime::Now();
+ switch (completion_record->state) {
+ case RpcState::COMPLETED: {
+ // If the RPC is COMPLETED and the request originates from a client (context, response are
+ // non-null) copy the response and reply immediately. If there is no context/response
+ // do nothing.
+ if (context != nullptr) {
+ DCHECK_NOTNULL(response)->CopyFrom(*completion_record->response);
+ context->call_->RespondSuccess(*response);
+ delete context;
+ }
+ return RpcState::COMPLETED;
+ }
+ case RpcState::IN_PROGRESS: {
+ // If the RPC is IN_PROGRESS check if there is a context and, if so, attach it
+ // so that the rpc gets the same response when the original one completes.
+ if (context != nullptr) {
+ completion_record->ongoing_rpcs.push_back({DCHECK_NOTNULL(response),
+ context,
+ NO_HANDLER});
+ }
+ return RpcState::IN_PROGRESS;
+ }
+ default:
+ LOG(FATAL) << "Wrong state: " << completion_record->state;
+ // dummy return to avoid warnings
+ return RpcState::STALE;
+ }
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpcOrChangeDriver(const RequestIdPB& request_id) {
+ lock_guard<simple_spinlock> l(lock_);
+ RpcState state = TrackRpcUnlocked(request_id, nullptr, nullptr);
+
+ if (state != RpcState::IN_PROGRESS) return state;
+
+ CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+ ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
+
+ // ... if we did find a CompletionRecord change the driver and return true.
+ completion_record->driver_attempt_no = request_id.attempt_no();
+ completion_record->ongoing_rpcs.push_back({nullptr,
+ nullptr,
+ request_id.attempt_no()});
+
+ // Since we changed the driver of the RPC, return NEW, so that the caller knows
+ // to store the result.
+ return RpcState::NEW;
+}
+
+bool ResultTracker::IsCurrentDriver(const RequestIdPB& request_id) {
+ lock_guard<simple_spinlock> l(lock_);
+ CompletionRecord* completion_record = FindCompletionRecordOrNullUnlocked(request_id);
+
+ // If we couldn't find the CompletionRecord, someone might have called FailAndRespond() so
+ // just return false.
+ if (completion_record == nullptr) return false;
+
+ // ... if we did find a CompletionRecord return true if we're the driver or false
+ // otherwise.
+ return completion_record->driver_attempt_no == request_id.attempt_no();
+}
+
+void ResultTracker::LogAndTraceAndRespondSuccess(RpcContext* context,
+ const Message& msg) {
+ InboundCall* call = context->call_;
+ VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC success "
+ "response for " << call->ToString() << ":" << std::endl << SecureDebugString(msg);
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "response", pb_util::PbTracer::TracePb(msg),
+ "trace", context->trace()->DumpToString());
+ call->RespondSuccess(msg);
+ delete context;
+}
+
+void ResultTracker::LogAndTraceFailure(RpcContext* context,
+ const Message& msg) {
+ InboundCall* call = context->call_;
+ VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure "
+ "response for " << call->ToString() << ": " << SecureDebugString(msg);
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "response", pb_util::PbTracer::TracePb(msg),
+ "trace", context->trace()->DumpToString());
+}
+
+void ResultTracker::LogAndTraceFailure(RpcContext* context,
+ ErrorStatusPB_RpcErrorCodePB err,
+ const Status& status) {
+ InboundCall* call = context->call_;
+ VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure "
+ "response for " << call->ToString() << ": " << status.ToString();
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "status", status.ToString(),
+ "trace", context->trace()->DumpToString());
+}
+
+ResultTracker::CompletionRecord* ResultTracker::FindCompletionRecordOrDieUnlocked(
+ const RequestIdPB& request_id) {
+ ClientState* client_state = DCHECK_NOTNULL(FindPointeeOrNull(clients_, request_id.client_id()));
+ return DCHECK_NOTNULL(FindPointeeOrNull(client_state->completion_records, request_id.seq_no()));
+}
+
+pair<ResultTracker::ClientState*, ResultTracker::CompletionRecord*>
+ResultTracker::FindClientStateAndCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) {
+ ClientState* client_state = FindPointeeOrNull(clients_, request_id.client_id());
+ CompletionRecord* completion_record = nullptr;
+ if (client_state != nullptr) {
+ completion_record = FindPointeeOrNull(client_state->completion_records, request_id.seq_no());
+ }
+ return make_pair(client_state, completion_record);
+}
+
+ResultTracker::CompletionRecord*
+ResultTracker::FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) {
+ return FindClientStateAndCompletionRecordOrNullUnlocked(request_id).second;
+}
+
+void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id,
+ const Message* response) {
+ vector<OnGoingRpcInfo> to_respond;
+ {
+ lock_guard<simple_spinlock> l(lock_);
+
+ CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+ ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
+
+ CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
+ << "Called RecordCompletionAndRespond() from an executor identified with an "
+ << "attempt number that was not marked as the driver for the RPC. RequestId: "
+ << SecureShortDebugString(request_id) << "\nTracker state:\n " << ToStringUnlocked();
+ DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS);
+ completion_record->response.reset(DCHECK_NOTNULL(response)->New());
+ completion_record->response->CopyFrom(*response);
+ completion_record->state = RpcState::COMPLETED;
+ completion_record->last_updated = MonoTime::Now();
+
+ CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no());
+
+ int64_t handler_attempt_no = request_id.attempt_no();
+
+ // Go through the ongoing RPCs and reply to each one.
+ for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+ orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+ const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+ if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+ if (ongoing_rpc.context != nullptr) {
+ to_respond.push_back(ongoing_rpc);
+ }
+ ++orpc_iter;
+ orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+ completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+ } else {
+ ++orpc_iter;
+ }
+ }
+ }
+
+ // Respond outside of holding the lock. This reduces lock contention and also
+ // means that we will have fully updated our memory tracking before responding,
+ // which makes testing easier.
+ for (auto& ongoing_rpc : to_respond) {
+ if (PREDICT_FALSE(ongoing_rpc.response != response)) {
+ ongoing_rpc.response->CopyFrom(*response);
+ }
+ LogAndTraceAndRespondSuccess(ongoing_rpc.context, *ongoing_rpc.response);
+ }
+}
+
+void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
+ HandleOngoingRpcFunc func) {
+ vector<OnGoingRpcInfo> to_handle;
+ {
+ lock_guard<simple_spinlock> l(lock_);
+ auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
+ if (PREDICT_FALSE(state_and_record.first == nullptr)) {
+ LOG(FATAL) << "Couldn't find ClientState for request: " << SecureShortDebugString(request_id)
+ << ". \nTracker state:\n" << ToStringUnlocked();
+ }
+
+ CompletionRecord* completion_record = state_and_record.second;
+
+ // It is possible for this method to be called for an RPC that was never actually
+ // tracked (though RecordCompletionAndRespond() can't). One such case is when a
+ // follower transaction fails on the TransactionManager, for some reason, before it
+ // was tracked. The CompletionCallback still calls this method. In this case, do
+ // nothing.
+ if (completion_record == nullptr) {
+ return;
+ }
+
+ ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
+ completion_record->last_updated = MonoTime::Now();
+
+ int64_t seq_no = request_id.seq_no();
+ int64_t handler_attempt_no = request_id.attempt_no();
+
+ // If we're copying from a client originated response we need to take care to reply
+ // to that call last, otherwise we'll lose 'response', before we go through all the
+ // CompletionRecords.
+ for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+ orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+ const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+ if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+ to_handle.push_back(ongoing_rpc);
+ ++orpc_iter;
+ orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+ completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+ } else {
+ ++orpc_iter;
+ }
+ }
+
+ // If we're the last ones trying this and the state is not completed,
+ // delete the completion record.
+ if (completion_record->ongoing_rpcs.size() == 0
+ && completion_record->state != RpcState::COMPLETED) {
+ cr_updater.Cancel();
+ unique_ptr<CompletionRecord> completion_record =
+ EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no);
+ mem_tracker_->Release(completion_record->memory_footprint());
+ }
+ }
+
+ // Wait until outside the lock to do the heavy-weight work.
+ for (auto& ongoing_rpc : to_handle) {
+ if (ongoing_rpc.context != nullptr) {
+ func(ongoing_rpc);
+ delete ongoing_rpc.context;
+ }
+ }
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id, Message* response) {
+ auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+ // In the common case RPCs are just executed once so, in that case, avoid an extra
+ // copy of the response.
+ if (PREDICT_FALSE(ongoing_rpc.response != response)) {
+ ongoing_rpc.response->CopyFrom(*response);
+ }
+ LogAndTraceFailure(ongoing_rpc.context, *response);
+ ongoing_rpc.context->call_->RespondSuccess(*response);
+ };
+ FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
+ ErrorStatusPB_RpcErrorCodePB err, const Status& status) {
+ auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+ LogAndTraceFailure(ongoing_rpc.context, err, status);
+ ongoing_rpc.context->call_->RespondFailure(err, status);
+ };
+ FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
+ int error_ext_id, const string& message,
+ const Message& app_error_pb) {
+ auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+ LogAndTraceFailure(ongoing_rpc.context, app_error_pb);
+ ongoing_rpc.context->call_->RespondApplicationError(error_ext_id, message, app_error_pb);
+ };
+ FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::StartGCThread() {
+ CHECK(!gc_thread_);
+ CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread,
+ this, &gc_thread_));
+}
+
+void ResultTracker::RunGCThread() {
+ while (!gc_thread_stop_latch_.WaitFor(MonoDelta::FromMilliseconds(
+ FLAGS_result_tracker_gc_interval_ms))) {
+ GCResults();
+ }
+}
+
+void ResultTracker::GCResults() {
+ lock_guard<simple_spinlock> l(lock_);
+ MonoTime now = MonoTime::Now();
+ // Calculate the instants before which we'll start GCing ClientStates and CompletionRecords.
+ MonoTime time_to_gc_clients_from = now;
+ time_to_gc_clients_from.AddDelta(
+ MonoDelta::FromMilliseconds(-FLAGS_remember_clients_ttl_ms));
+ MonoTime time_to_gc_responses_from = now;
+ time_to_gc_responses_from.AddDelta(
+ MonoDelta::FromMilliseconds(-FLAGS_remember_responses_ttl_ms));
+
+ // Now go through the ClientStates. If we haven't heard from a client in a while
+ // GC it and all its completion records (making sure there isn't actually one in progress first).
+ // If we've heard from a client recently, but some of its responses are old, GC those responses.
+ for (auto iter = clients_.begin(); iter != clients_.end();) {
+ auto& client_state = iter->second;
+ if (client_state->last_heard_from < time_to_gc_clients_from) {
+ // Client should be GCed.
+ bool ongoing_request = false;
+ client_state->GCCompletionRecords(
+ mem_tracker_,
+ [&] (SequenceNumber, CompletionRecord* completion_record) {
+ if (PREDICT_FALSE(completion_record->state == RpcState::IN_PROGRESS)) {
+ ongoing_request = true;
+ return false;
+ }
+ return true;
+ });
+ // Don't delete the client state if there is still a request in execution.
+ if (PREDICT_FALSE(ongoing_request)) {
+ ++iter;
+ continue;
+ }
+ mem_tracker_->Release(client_state->memory_footprint());
+ iter = clients_.erase(iter);
+ } else {
+ // Client can't be GCed, but its calls might be GCable.
+ iter->second->GCCompletionRecords(
+ mem_tracker_,
+ [&] (SequenceNumber, CompletionRecord* completion_record) {
+ return completion_record->state != RpcState::IN_PROGRESS &&
+ completion_record->last_updated < time_to_gc_responses_from;
+ });
+ ++iter;
+ }
+ }
+}
+
+string ResultTracker::ToString() {
+ lock_guard<simple_spinlock> l(lock_);
+ return ToStringUnlocked();
+}
+
+string ResultTracker::ToStringUnlocked() const {
+ string result = Substitute("ResultTracker[this: $0, Num. Client States: $1, Client States:\n",
+ this, clients_.size());
+ for (auto& cs : clients_) {
+ SubstituteAndAppend(&result, Substitute("\n\tClient: $0, $1", cs.first, cs.second->ToString()));
+ }
+ result.append("]");
+ return result;
+}
+
+template<class MustGcRecordFunc>
+void ResultTracker::ClientState::GCCompletionRecords(
+ const shared_ptr<kudu::MemTracker>& mem_tracker,
+ MustGcRecordFunc must_gc_record_func) {
+ ScopedMemTrackerUpdater<ClientState> updater(mem_tracker.get(), this);
+ for (auto iter = completion_records.begin(); iter != completion_records.end();) {
+ if (must_gc_record_func(iter->first, iter->second.get())) {
+ mem_tracker->Release(iter->second->memory_footprint());
+ SequenceNumber deleted_seq_no = iter->first;
+ iter = completion_records.erase(iter);
+ // Each time we GC a response, update 'stale_before_seq_no'.
+ // This will allow to answer clients that their responses are stale if we get
+ // a request with a sequence number lower than or equal to this one.
+ stale_before_seq_no = std::max(deleted_seq_no + 1, stale_before_seq_no);
+ continue;
+ }
+ // Since we store completion records in order, if we found one that shouldn't be GCed,
+ // don't GC anything after it.
+ return;
+ }
+}
+
+string ResultTracker::ClientState::ToString() const {
+ auto since_last_heard =
+ MonoTime::Now().GetDeltaSince(last_heard_from);
+ string result = Substitute("Client State[Last heard from: $0s ago, "
+ "$1 CompletionRecords:",
+ since_last_heard.ToString(),
+ completion_records.size());
+ for (auto& completion_record : completion_records) {
+ SubstituteAndAppend(&result, Substitute("\n\tCompletion Record: $0, $1",
+ completion_record.first,
+ completion_record.second->ToString()));
+ }
+ result.append("\t]");
+ return result;
+}
+
+string ResultTracker::CompletionRecord::ToString() const {
+ string result = Substitute("Completion Record[State: $0, Driver: $1, "
+ "Cached response: $2, $3 OngoingRpcs:",
+ state,
+ driver_attempt_no,
+ response ? SecureShortDebugString(*response) : "None",
+ ongoing_rpcs.size());
+ for (auto& orpc : ongoing_rpcs) {
+ SubstituteAndAppend(&result, Substitute("\n\t$0", orpc.ToString()));
+ }
+ result.append("\t\t]");
+ return result;
+}
+
+string ResultTracker::OnGoingRpcInfo::ToString() const {
+ return Substitute("OngoingRpc[Handler: $0, Context: $1, Response: $2]",
+ handler_attempt_no, context,
+ response ? SecureShortDebugString(*response) : "NULL");
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/result_tracker.h b/be/src/kudu/rpc/result_tracker.h
new file mode 100644
index 0000000..f629d7a
--- /dev/null
+++ b/be/src/kudu/rpc/result_tracker.h
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <functional>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/thread.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // protobuf
+} // google
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+
+// A ResultTracker for RPC results.
+//
+// The ResultTracker is responsible for tracking the results of RPCs and making sure that
+// client calls with the same client ID and sequence number (first attempt and subsequent retries)
+// are executed exactly once.
+//
+// In most cases, the use of ResultTracker is internal to the RPC system: RPCs are tracked when
+// they first arrive, before service methods are called, and calls to ResultTracker to store
+// responses are performed internally by RpcContext. The exception is when an RPC is replicated
+// across multiple servers, such as with writes, in which case direct interaction with the result
+// tracker is required so as to cache responses on replicas which did not receive the RPC directly
+// from the client.
+//
+// Throughout this header and elsewhere we use the following terms:
+//
+// RPC - The operation that a client or another server wants to execute on this server. The client
+// might attempt one RPC many times, for instance if failures or timeouts happen.
+// Attempt - Each individual attempt of an RPC on the server.
+// Handler - A thread executing an attempt. Usually there is only one handler that executes the
+// first attempt of an RPC and, when it completes, replies to its own attempt and to all
+// other attempts that might have arrived after it started.
+// Driver - Only important in cases where there might be multiple handlers (e.g. in replicated
+// RPCs). In these cases there might be two handlers executing the same RPC, corresponding
+// to different attempts. Since the RPC must be executed exactly once, only one of the
+// handlers must be selected as the "driver" and actually perform the operation.
+//
+// If a client wishes to track the result of a given RPC it must send on the RPC header
+// a RequestId with the following information:
+//
+// Client ID - Uniquely identifies a single client. All the RPCs originating from the same
+// client must have the same ID.
+// Sequence number - Uniquely identifies a single RPC, even across retries to multiple servers, for
+// replicated RPCs. All retries of the same RPC must have the same sequence
+// number.
+// Attempt number - Uniquely identifies each retry of the same RPC. All retries of the same RPC
+// must have different attempt numbers.
+//
+// When a call first arrives from the client the RPC subsystem will call TrackRpc() which
+// will return the state of the RPC in the form of an RpcState enum.
+//
+// If the ResultTracker returns NEW, this signals that it's the first time the server has heard
+// of the RPC and that the corresponding server function should be executed.
+//
+// If anything other than NEW is returned it means that the call has either previously completed or
+// is in the process of being executed. In this case the caller should _not_ execute the function
+// corresponding to the RPC. The ResultTracker itself will take care of responding to the client
+// appropriately. If the RPC was already completed, the ResultTracker replies to the client
+// immediately. If the RPC is still ongoing, the attempt gets "attached" to the ongoing one and will
+// receive the same response when its handler finishes.
+//
+// If handling of the RPC is successful, RecordCompletionAndRespond() must be called
+// to register successful completion, in which case all pending or future RPCs with the same
+// sequence number, from the same client, will receive the same response.
+//
+// On the other hand, if execution of the server function is not successful then one of
+// the FailAndRespond() methods should be called, causing all _pending_ attempts to receive the same
+// error. However this error is not stored, any future attempt with the same sequence number and
+// same client ID will be given a new chance to execute, as if it it had never been tried before.
+// This gives the client a chance to either retry (if the failure reason is transient) or give up.
+//
+// ============================================================================
+// RPCs with multiple handlers
+// ============================================================================
+//
+// Some RPCs results are tracked by single server, i.e. they correspond to the modification of an
+// unreplicated resource and are unpersisted. For those no additional care needs to be taken, the
+// first attempt will be the only handler, and subsequent attempts will receive the response when
+// that first attempt is done.
+// However some RPCs are replicated across servers, using consensus, and thus can have multiple
+// handlers executing different attempts at the same time, e.g. one handler from a client
+// originating retry, and one from a previous leader originating update.
+//
+// In this case we need to make sure that the following invariants are enforced:
+// - Only one handler can actually record a response, the "driver" handler.
+// - Only one handler must respond to "attached" attempts.
+// - Each handler replies to their own RPCs, to avoid races. That is, a live handler should
+// not mutate another live handler's response/context.
+//
+// This is achieved by naming one handler the "driver" of the RPC and making sure that only
+// the driver can successfully complete it, i.e. call RecordCompletionAndRespond().
+//
+// In order to make sure there is only one driver, there must be an _external_ serialization
+// point, before the final response is produced, after which only one of the handlers will
+// be marked as the driver. For instance, for writes, this serialization point is in
+// TransactionDriver, in a synchronized block where a logic such as this one happens (here
+// in pseudo-ish code):
+//
+// {
+// lock_guard<simple_spinlock> l(lock_);
+// if (follower_transaction) {
+// result_tracker_->TrackRpcOrChangeDriver(request_id);
+// continue_with_transaction();
+// } else if (client_transaction) {
+// bool is_still_driver = result_tracker_->IsCurrentDriver(request_id);
+// if (is_still_driver) continue_with_transaction();
+// else abort_transaction();
+// }
+// }
+//
+// This class is thread safe.
+class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
+ public:
+ typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
+ static const int NO_HANDLER = -1;
+ // Enum returned by TrackRpc that reflects the state of the RPC.
+ enum RpcState {
+ // The RPC is new.
+ NEW,
+ // The RPC has previously completed and the same response has been sent
+ // to the client.
+ COMPLETED,
+ // The RPC is currently in-progress and, when it completes, the same response
+ // will be sent to the client.
+ IN_PROGRESS,
+ // The RPC's state is stale, meaning it's older than our per-client garbage
+ // collection watermark and we do not recall the original response.
+ STALE
+ };
+
+ explicit ResultTracker(std::shared_ptr<kudu::MemTracker> mem_tracker);
+ ~ResultTracker();
+
+ // Tracks the RPC and returns its current state.
+ //
+ // If the RpcState == NEW the caller is supposed to actually start executing the RPC.
+ // The caller still owns the passed 'response' and 'context'.
+ //
+ // If the RpcState is anything else all remaining actions will be taken care of internally,
+ // i.e. the caller no longer needs to execute the RPC and this takes ownership of the passed
+ // 'response' and 'context'.
+ RpcState TrackRpc(const RequestIdPB& request_id,
+ google::protobuf::Message* response,
+ RpcContext* context);
+
+ // Used to track RPC attempts which originate from other replicas, and which may race with
+ // client originated ones.
+ // Tracks the RPC if it is untracked or changes the current driver of this RPC, i.e. sets the
+ // attempt number in 'request_id' as the driver of the RPC, if it is tracked and IN_PROGRESS.
+ RpcState TrackRpcOrChangeDriver(const RequestIdPB& request_id);
+
+ // Checks if the attempt at an RPC identified by 'request_id' is the current driver of the
+ // RPC. That is, if the attempt number in 'request_id' corresponds to the attempt marked
+ // as the driver of this RPC, either by initially getting NEW from TrackRpc() or by
+ // explicit driver change with ChangeDriver().
+ bool IsCurrentDriver(const RequestIdPB& request_id);
+
+ // Records the completion of sucessful operation.
+ // This will respond to all RPCs from the same client with the same sequence_number.
+ // The response will be stored so that any future retries of this RPC get the same response.
+ //
+ // Requires that TrackRpc() was called before with the same 'client_id' and
+ // 'sequence_number'.
+ // Requires that the attempt indentified by 'request_id' is the current driver
+ // of the RPC.
+ void RecordCompletionAndRespond(const RequestIdPB& request_id,
+ const google::protobuf::Message* response);
+
+ // Responds to all RPCs identified by 'client_id' and 'sequence_number' with the same response,
+ // but doesn't actually store the response.
+ // This should be called when the RPC failed validation or if some transient error occurred.
+ // Based on the response the client can then decide whether to retry the RPC (which will
+ // be treated as a new one) or to give up.
+ //
+ // Requires that TrackRpc() was called before with the same 'client_id' and
+ // 'sequence_number'.
+ // Requires that the attempt indentified by 'request_id' is the current driver
+ // of the RPC.
+ void FailAndRespond(const RequestIdPB& request_id,
+ google::protobuf::Message* response);
+
+ // Overload to match other types of RpcContext::Respond*Failure()
+ void FailAndRespond(const RequestIdPB& request_id,
+ ErrorStatusPB_RpcErrorCodePB err, const Status& status);
+
+ // Overload to match other types of RpcContext::Respond*Failure()
+ void FailAndRespond(const RequestIdPB& request_id,
+ int error_ext_id, const std::string& message,
+ const google::protobuf::Message& app_error_pb);
+
+ // Start a background thread which periodically runs GCResults().
+ // This thread is automatically stopped in the destructor.
+ //
+ // Must be called at most once.
+ void StartGCThread();
+
+ // Runs time-based garbage collection on the results this result tracker is caching.
+ // When garbage collection runs, it goes through all ClientStates and:
+ // - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no
+ // requests are in progress, GCs the ClientState and all its CompletionRecords.
+ // - If a ClientState is newer than the 'remember_clients_ttl_ms' flag, goes
+ // through all CompletionRecords and:
+ // - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag,
+ // GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark.
+ //
+ // Typically this is invoked from an internal thread started by 'StartGCThread()'.
+ void GCResults();
+
+ string ToString();
+
+ private:
+ // Information about client originated ongoing RPCs.
+ // The lifecycle of 'response' and 'context' is managed by the RPC layer.
+ struct OnGoingRpcInfo {
+ google::protobuf::Message* response;
+ RpcContext* context;
+ int64_t handler_attempt_no;
+
+ std::string ToString() const;
+ };
+ // A completion record for an IN_PROGRESS or COMPLETED RPC.
+ struct CompletionRecord {
+ CompletionRecord(RpcState state, int64_t driver_attempt_no)
+ : state(state),
+ driver_attempt_no(driver_attempt_no),
+ last_updated(MonoTime::Now()) {
+ }
+
+ // The current state of the RPC.
+ RpcState state;
+
+ // The attempt number that is/was "driving" this RPC.
+ int64_t driver_attempt_no;
+
+ // The timestamp of the last CompletionRecord update.
+ MonoTime last_updated;
+
+ // The cached response, if this RPC is in COMPLETED state.
+ std::unique_ptr<google::protobuf::Message> response;
+
+ // The set of ongoing RPCs that correspond to this record.
+ std::vector<OnGoingRpcInfo> ongoing_rpcs;
+
+ std::string ToString() const;
+
+ // Calculates the memory footprint of this struct.
+ int64_t memory_footprint() const {
+ return kudu_malloc_usable_size(this)
+ + (ongoing_rpcs.capacity() > 0 ? kudu_malloc_usable_size(ongoing_rpcs.data()) : 0)
+ + (response.get() != nullptr ? response->SpaceUsed() : 0);
+ }
+ };
+
+ // The state corresponding to a single client.
+ struct ClientState {
+ typedef MemTrackerAllocator<
+ std::pair<const SequenceNumber,
+ std::unique_ptr<CompletionRecord>>> CompletionRecordMapAllocator;
+ typedef std::map<SequenceNumber,
+ std::unique_ptr<CompletionRecord>,
+ std::less<SequenceNumber>,
+ CompletionRecordMapAllocator> CompletionRecordMap;
+
+ explicit ClientState(std::shared_ptr<MemTracker> mem_tracker)
+ : stale_before_seq_no(0),
+ completion_records(CompletionRecordMap::key_compare(),
+ CompletionRecordMapAllocator(std::move(mem_tracker))) {}
+
+ // The last time we've heard from this client.
+ MonoTime last_heard_from;
+
+ // The sequence number of the first response we remember for this client.
+ // All sequence numbers before this one are considered STALE.
+ SequenceNumber stale_before_seq_no;
+
+ // The (un gc'd) CompletionRecords for this client.
+ CompletionRecordMap completion_records;
+
+ // Garbage collects this client's CompletionRecords for which MustGcRecordFunc returns
+ // true. We use a lambda here so that we can have a single method that GCs and releases
+ // the memory for CompletionRecords based on different policies.
+ //
+ // 'func' should have the following signature:
+ // bool MyFunction(SequenceNumber seq_no, CompletionRecord* record);
+ //
+ template<class MustGcRecordFunc>
+ void GCCompletionRecords(const std::shared_ptr<kudu::MemTracker>& mem_tracker,
+ MustGcRecordFunc func);
+
+ std::string ToString() const;
+
+ // Calculates the memory footprint of this struct.
+ // This calculation is shallow and doesn't account for the memory the nested data
+ // structures occupy.
+ int64_t memory_footprint() const {
+ return kudu_malloc_usable_size(this);
+ }
+ };
+
+ RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
+ google::protobuf::Message* response,
+ RpcContext* context);
+
+ typedef std::function<void (const OnGoingRpcInfo&)> HandleOngoingRpcFunc;
+
+ // Helper method to handle the multiple overloads of FailAndRespond. Takes a lambda
+ // that knows what to do with OnGoingRpcInfo in each individual case.
+ void FailAndRespondInternal(const rpc::RequestIdPB& request_id,
+ HandleOngoingRpcFunc func);
+
+ CompletionRecord* FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id);
+ CompletionRecord* FindCompletionRecordOrDieUnlocked(const RequestIdPB& request_id);
+ std::pair<ClientState*, CompletionRecord*> FindClientStateAndCompletionRecordOrNullUnlocked(
+ const RequestIdPB& request_id);
+
+ // A handler must handle an RPC attempt if:
+ // 1 - It's its own attempt. I.e. it has the same attempt number of the handler.
+ // 2 - It's the driver of the RPC and the attempt has no handler (was attached).
+ bool MustHandleRpc(int64_t handler_attempt_no,
+ CompletionRecord* completion_record,
+ const OnGoingRpcInfo& ongoing_rpc) {
+ if (PREDICT_TRUE(ongoing_rpc.handler_attempt_no == handler_attempt_no)) {
+ return true;
+ }
+ if (completion_record->driver_attempt_no == handler_attempt_no) {
+ return ongoing_rpc.handler_attempt_no == NO_HANDLER;
+ }
+ return false;
+ }
+
+ void LogAndTraceAndRespondSuccess(RpcContext* context, const google::protobuf::Message& msg);
+ void LogAndTraceFailure(RpcContext* context, const google::protobuf::Message& msg);
+ void LogAndTraceFailure(RpcContext* context, ErrorStatusPB_RpcErrorCodePB err,
+ const Status& status);
+
+ std::string ToStringUnlocked() const;
+
+ void RunGCThread();
+
+ // The memory tracker that tracks this ResultTracker's memory consumption.
+ std::shared_ptr<kudu::MemTracker> mem_tracker_;
+
+ // Lock that protects access to 'clients_' and to the state contained in each
+ // ClientState.
+ // TODO consider a per-ClientState lock if we find this too coarse grained.
+ simple_spinlock lock_;
+
+ typedef MemTrackerAllocator<std::pair<const std::string,
+ std::unique_ptr<ClientState>>> ClientStateMapAllocator;
+ typedef std::map<std::string,
+ std::unique_ptr<ClientState>,
+ std::less<std::string>,
+ ClientStateMapAllocator> ClientStateMap;
+
+ ClientStateMap clients_;
+
+ // The thread which runs GC, and a latch to stop it.
+ scoped_refptr<Thread> gc_thread_;
+ CountDownLatch gc_thread_stop_latch_;
+
+ DISALLOW_COPY_AND_ASSIGN(ResultTracker);
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/retriable_rpc.h b/be/src/kudu/rpc/retriable_rpc.h
new file mode 100644
index 0000000..c896027
--- /dev/null
+++ b/be/src/kudu/rpc/retriable_rpc.h
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+namespace rpc {
+
+namespace internal {
+typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
+}
+
+// A base class for retriable RPCs that handles replica picking and retry logic.
+//
+// The 'Server' template parameter refers to the type of the server that will be looked up
+// and passed to the derived classes on Try(). For instance in the case of WriteRpc it's
+// RemoteTabletServer.
+//
+// TODO(unknown): merge RpcRetrier into this class? Can't be done right now as the retrier is used
+// independently elsewhere, but likely possible when all replicated RPCs have a ReplicaPicker.
+//
+// TODO(unknown): allow to target replicas other than the leader, if needed.
+//
+// TODO(unknown): once we have retry handling on all the RPCs merge this with rpc::Rpc.
+template <class Server, class RequestPB, class ResponsePB>
+class RetriableRpc : public Rpc {
+ public:
+ RetriableRpc(const scoped_refptr<ServerPicker<Server>>& server_picker,
+ const scoped_refptr<RequestTracker>& request_tracker,
+ const MonoTime& deadline,
+ std::shared_ptr<Messenger> messenger)
+ : Rpc(deadline, std::move(messenger)),
+ server_picker_(server_picker),
+ request_tracker_(request_tracker),
+ sequence_number_(RequestTracker::NO_SEQ_NO),
+ num_attempts_(0) {}
+
+ virtual ~RetriableRpc() {
+ DCHECK_EQ(sequence_number_, RequestTracker::NO_SEQ_NO);
+ }
+
+ // Performs server lookup/initialization.
+ // If/when the server is looked up and initialized successfully RetriableRpc will call
+ // Try() to actually send the request.
+ void SendRpc() override;
+
+ // The callback to call upon retrieving (of failing to retrieve) a new authn
+ // token. This is the callback that subclasses should call in their custom
+ // implementation of the GetNewAuthnTokenAndRetry() method.
+ void GetNewAuthnTokenAndRetryCb(const Status& status);
+
+ protected:
+ // Subclasses implement this method to actually try the RPC.
+ // The server been looked up and is ready to be used.
+ virtual void Try(Server* replica, const ResponseCallback& callback) = 0;
+
+ // Subclasses implement this method to analyze 'status', the controller status or
+ // the response and return a RetriableRpcStatus which will then be used
+ // to decide how to proceed (retry or give up).
+ virtual RetriableRpcStatus AnalyzeResponse(const Status& status) = 0;
+
+ // Subclasses implement this method to perform cleanup and/or final steps.
+ // After this is called the RPC will be no longer retried.
+ virtual void Finish(const Status& status) = 0;
+
+ // Returns 'true' if the RPC is to scheduled for retry with a new authn token,
+ // 'false' otherwise. For RPCs performed in the context of providing token
+ // for authentication it's necessary to implement this method. The default
+ // implementation returns 'false' meaning the calls returning
+ // INVALID_AUTHENTICATION_TOKEN RPC status are not retried.
+ virtual bool GetNewAuthnTokenAndRetry() {
+ return false;
+ }
+
+ // Request body.
+ RequestPB req_;
+
+ // Response body.
+ ResponsePB resp_;
+
+ private:
+ friend class CalculatorServiceRpc;
+
+ // Decides whether to retry the RPC, based on the result of AnalyzeResponse()
+ // and retries if that is the case.
+ // Returns true if the RPC was retried or false otherwise.
+ bool RetryIfNeeded(const RetriableRpcStatus& result, Server* server);
+
+ // Called when the replica has been looked up.
+ void ReplicaFoundCb(const Status& status, Server* server);
+
+ // Called after the RPC was performed.
+ void SendRpcCb(const Status& status) override;
+
+ // Performs final cleanup, after the RPC is done (independently of success).
+ void FinishInternal();
+
+ scoped_refptr<ServerPicker<Server>> server_picker_;
+ scoped_refptr<RequestTracker> request_tracker_;
+ std::shared_ptr<Messenger> messenger_;
+
+ // The sequence number for this RPC.
+ internal::SequenceNumber sequence_number_;
+
+ // The number of times this RPC has been attempted
+ int32 num_attempts_;
+
+ // Keeps track of the replica the RPCs were sent to.
+ // TODO Remove this and pass the used replica around. For now we need to keep this as
+ // the retrier calls the SendRpcCb directly and doesn't know the replica that was
+ // being written to.
+ Server* current_;
+};
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpc() {
+ if (sequence_number_ == RequestTracker::NO_SEQ_NO) {
+ CHECK_OK(request_tracker_->NewSeqNo(&sequence_number_));
+ }
+ server_picker_->PickLeader(Bind(&RetriableRpc::ReplicaFoundCb,
+ Unretained(this)),
+ retrier().deadline());
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::GetNewAuthnTokenAndRetryCb(
+ const Status& status) {
+ if (status.ok()) {
+ // Perform the RPC call with the newly fetched authn token.
+ mutable_retrier()->mutable_controller()->Reset();
+ SendRpc();
+ } else {
+ // Back to the retry sequence, hoping for better conditions after some time.
+ VLOG(1) << "Failed to get new authn token: " << status.ToString();
+ mutable_retrier()->DelayedRetry(this, status);
+ }
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(
+ const RetriableRpcStatus& result, Server* server) {
+ // Handle the cases where we retry.
+ switch (result.result) {
+ case RetriableRpcStatus::SERVICE_UNAVAILABLE:
+ // For writes, always retry the request on the same server in case of the
+ // SERVICE_UNAVAILABLE error.
+ break;
+
+ case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE:
+ // TODO(KUDU-1745): not checking for null here results in a crash, since in the case
+ // of a failed master lookup we have no tablet server corresponding to the error.
+ //
+ // But, with the null check, we end up with a relatively tight retry loop
+ // in this scenario whereas we should be backing off. Need to improve
+ // test coverage here to understand why the back-off is not taking effect.
+ if (server != nullptr) {
+ VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString();
+ // Mark the server as failed. As for details on the only existing
+ // implementation of ServerPicker::MarkServerFailed(), see the note on
+ // the MetaCacheServerPicker::MarkServerFailed() method.
+ server_picker_->MarkServerFailed(server, result.status);
+ }
+ break;
+
+ case RetriableRpcStatus::RESOURCE_NOT_FOUND:
+ // The TabletServer was not part of the config serving the tablet.
+ // We mark our tablet cache as stale, forcing a master lookup on the
+ // next attempt.
+ //
+ // TODO(KUDU-1314): Don't backoff the first time we hit this error.
+ server_picker_->MarkResourceNotFound(server);
+ break;
+
+ case RetriableRpcStatus::REPLICA_NOT_LEADER:
+ // The TabletServer was not the leader of the quorum.
+ server_picker_->MarkReplicaNotLeader(server);
+ break;
+
+ case RetriableRpcStatus::INVALID_AUTHENTICATION_TOKEN: {
+ // This is a special case for retry: first it's necessary to get a new
+ // authn token and then retry the operation with the new token.
+ if (GetNewAuthnTokenAndRetry()) {
+ // The RPC will be retried.
+ resp_.Clear();
+ return true;
+ }
+ // Do not retry.
+ return false;
+ }
+
+ case RetriableRpcStatus::NON_RETRIABLE_ERROR:
+ if (server != nullptr && result.status.IsTimedOut()) {
+ // For the NON_RETRIABLE_ERROR result in case of TimedOut status,
+ // mark the server as failed. As for details on the only existing
+ // implementation of ServerPicker::MarkServerFailed(), see the note on
+ // the MetaCacheServerPicker::MarkServerFailed() method.
+ VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString();
+ server_picker_->MarkServerFailed(server, result.status);
+ }
+ // Do not retry in the case of non-retriable error.
+ return false;
+
+ default:
+ // For the OK case we should not retry.
+ DCHECK(result.result == RetriableRpcStatus::OK);
+ return false;
+ }
+ resp_.Clear();
+ current_ = nullptr;
+ mutable_retrier()->DelayedRetry(this, result.status);
+ return true;
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::FinishInternal() {
+ // Mark the RPC as completed and set the sequence number to NO_SEQ_NO to make
+ // sure we're in the appropriate state before destruction.
+ request_tracker_->RpcCompleted(sequence_number_);
+ sequence_number_ = RequestTracker::NO_SEQ_NO;
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::ReplicaFoundCb(const Status& status,
+ Server* server) {
+ // NOTE: 'server' here may be nullptr in the case that status is not OK!
+ RetriableRpcStatus result = AnalyzeResponse(status);
+ if (RetryIfNeeded(result, server)) return;
+
+ if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) {
+ FinishInternal();
+ Finish(result.status);
+ return;
+ }
+
+ // We successfully found a replica, so prepare the RequestIdPB before we send out the call.
+ std::unique_ptr<RequestIdPB> request_id(new RequestIdPB());
+ request_id->set_client_id(request_tracker_->client_id());
+ request_id->set_seq_no(sequence_number_);
+ request_id->set_first_incomplete_seq_no(request_tracker_->FirstIncomplete());
+ request_id->set_attempt_no(num_attempts_++);
+
+ mutable_retrier()->mutable_controller()->SetRequestIdPB(std::move(request_id));
+
+ DCHECK_EQ(result.result, RetriableRpcStatus::OK);
+ current_ = server;
+ Try(server, boost::bind(&RetriableRpc::SendRpcCb, this, Status::OK()));
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpcCb(const Status& status) {
+ RetriableRpcStatus result = AnalyzeResponse(status);
+ if (RetryIfNeeded(result, current_)) return;
+
+ FinishInternal();
+
+ // From here on out the RPC has either succeeded of suffered a non-retriable
+ // failure.
+ Status final_status = result.status;
+ if (!final_status.ok()) {
+ string error_string;
+ if (current_) {
+ error_string = strings::Substitute("Failed to write to server: $0", current_->ToString());
+ } else {
+ error_string = "Failed to write to server: (no server available)";
+ }
+ final_status = final_status.CloneAndPrepend(error_string);
+ }
+ Finish(final_status);
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc-bench.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-bench.cc b/be/src/kudu/rpc/rpc-bench.cc
new file mode 100644
index 0000000..d569ea1
--- /dev/null
+++ b/be/src/kudu/rpc/rpc-bench.cc
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rtest.proxy.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/test_util.h"
+
+using std::bind;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+DEFINE_int32(client_threads, 16,
+ "Number of client threads. For the synchronous benchmark, each thread has "
+ "a single outstanding synchronous request at a time. For the async "
+ "benchmark, this determines the number of client reactors.");
+
+DEFINE_int32(async_call_concurrency, 60,
+ "Number of concurrent requests that will be outstanding at a time for the "
+ "async benchmark. The requests are multiplexed across the number of "
+ "reactors specified by the 'client_threads' flag.");
+
+DEFINE_int32(worker_threads, 1,
+ "Number of server worker threads");
+
+DEFINE_int32(server_reactors, 4,
+ "Number of server reactor threads");
+
+DEFINE_int32(run_seconds, 1, "Seconds to run the test");
+
+DECLARE_bool(rpc_encrypt_loopback_connections);
+DEFINE_bool(enable_encryption, false, "Whether to enable TLS encryption for rpc-bench");
+
+namespace kudu {
+namespace rpc {
+
+class RpcBench : public RpcTestBase {
+ public:
+ RpcBench()
+ : should_run_(true),
+ stop_(0)
+ {}
+
+ void SetUp() override {
+ OverrideFlagForSlowTests("run_seconds", "10");
+
+ n_worker_threads_ = FLAGS_worker_threads;
+ n_server_reactor_threads_ = FLAGS_server_reactors;
+
+ // Set up server.
+ FLAGS_rpc_encrypt_loopback_connections = FLAGS_enable_encryption;
+ StartTestServerWithGeneratedCode(&server_addr_, FLAGS_enable_encryption);
+ }
+
+ void SummarizePerf(CpuTimes elapsed, int total_reqs, bool sync) {
+ float reqs_per_second = static_cast<float>(total_reqs / elapsed.wall_seconds());
+ float user_cpu_micros_per_req = static_cast<float>(elapsed.user / 1000.0 / total_reqs);
+ float sys_cpu_micros_per_req = static_cast<float>(elapsed.system / 1000.0 / total_reqs);
+ float csw_per_req = static_cast<float>(elapsed.context_switches) / total_reqs;
+
+ LOG(INFO) << "Mode: " << (sync ? "Sync" : "Async");
+ if (sync) {
+ LOG(INFO) << "Client threads: " << FLAGS_client_threads;
+ } else {
+ LOG(INFO) << "Client reactors: " << FLAGS_client_threads;
+ LOG(INFO) << "Call concurrency: " << FLAGS_async_call_concurrency;
+ }
+
+ LOG(INFO) << "Worker threads: " << FLAGS_worker_threads;
+ LOG(INFO) << "Server reactors: " << FLAGS_server_reactors;
+ LOG(INFO) << "Encryption: " << FLAGS_enable_encryption;
+ LOG(INFO) << "----------------------------------";
+ LOG(INFO) << "Reqs/sec: " << reqs_per_second;
+ LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us";
+ LOG(INFO) << "Sys CPU per req: " << sys_cpu_micros_per_req << "us";
+ LOG(INFO) << "Ctx Sw. per req: " << csw_per_req;
+
+ }
+
+ protected:
+ friend class ClientThread;
+ friend class ClientAsyncWorkload;
+
+ Sockaddr server_addr_;
+ Atomic32 should_run_;
+ CountDownLatch stop_;
+};
+
+class ClientThread {
+ public:
+ explicit ClientThread(RpcBench *bench)
+ : bench_(bench),
+ request_count_(0) {
+ }
+
+ void Start() {
+ thread_.reset(new thread(&ClientThread::Run, this));
+ }
+
+ void Join() {
+ thread_->join();
+ }
+
+ void Run() {
+ shared_ptr<Messenger> client_messenger = bench_->CreateMessenger("Client");
+
+ CalculatorServiceProxy p(client_messenger, bench_->server_addr_);
+
+ AddRequestPB req;
+ AddResponsePB resp;
+ while (Acquire_Load(&bench_->should_run_)) {
+ req.set_x(request_count_);
+ req.set_y(request_count_);
+ RpcController controller;
+ controller.set_timeout(MonoDelta::FromSeconds(10));
+ CHECK_OK(p.Add(req, &resp, &controller));
+ CHECK_EQ(req.x() + req.y(), resp.result());
+ request_count_++;
+ }
+ }
+
+ unique_ptr<thread> thread_;
+ RpcBench *bench_;
+ int request_count_;
+};
+
+
+// Test making successful RPC calls.
+TEST_F(RpcBench, BenchmarkCalls) {
+ Stopwatch sw(Stopwatch::ALL_THREADS);
+ sw.start();
+
+ vector<unique_ptr<ClientThread>> threads;
+ for (int i = 0; i < FLAGS_client_threads; i++) {
+ threads.emplace_back(new ClientThread(this));
+ threads.back()->Start();
+ }
+
+ SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
+ Release_Store(&should_run_, false);
+
+ int total_reqs = 0;
+
+ for (auto& thr : threads) {
+ thr->Join();
+ total_reqs += thr->request_count_;
+ }
+ sw.stop();
+
+ SummarizePerf(sw.elapsed(), total_reqs, true);
+}
+
+class ClientAsyncWorkload {
+ public:
+ ClientAsyncWorkload(RpcBench *bench, shared_ptr<Messenger> messenger)
+ : bench_(bench),
+ messenger_(std::move(messenger)),
+ request_count_(0) {
+ controller_.set_timeout(MonoDelta::FromSeconds(10));
+ proxy_.reset(new CalculatorServiceProxy(messenger_, bench_->server_addr_));
+ }
+
+ void CallOneRpc() {
+ if (request_count_ > 0) {
+ CHECK_OK(controller_.status());
+ CHECK_EQ(req_.x() + req_.y(), resp_.result());
+ }
+ if (!Acquire_Load(&bench_->should_run_)) {
+ bench_->stop_.CountDown();
+ return;
+ }
+ controller_.Reset();
+ req_.set_x(request_count_);
+ req_.set_y(request_count_);
+ request_count_++;
+ proxy_->AddAsync(req_,
+ &resp_,
+ &controller_,
+ bind(&ClientAsyncWorkload::CallOneRpc, this));
+ }
+
+ void Start() {
+ CallOneRpc();
+ }
+
+ RpcBench *bench_;
+ shared_ptr<Messenger> messenger_;
+ unique_ptr<CalculatorServiceProxy> proxy_;
+ uint32_t request_count_;
+ RpcController controller_;
+ AddRequestPB req_;
+ AddResponsePB resp_;
+};
+
+TEST_F(RpcBench, BenchmarkCallsAsync) {
+ int threads = FLAGS_client_threads;
+ int concurrency = FLAGS_async_call_concurrency;
+
+ vector<shared_ptr<Messenger>> messengers;
+ for (int i = 0; i < threads; i++) {
+ messengers.push_back(CreateMessenger("Client"));
+ }
+
+ vector<unique_ptr<ClientAsyncWorkload>> workloads;
+ for (int i = 0; i < concurrency; i++) {
+ workloads.emplace_back(
+ new ClientAsyncWorkload(this, messengers[i % threads]));
+ }
+
+ stop_.Reset(concurrency);
+
+ Stopwatch sw(Stopwatch::ALL_THREADS);
+ sw.start();
+
+ for (int i = 0; i < concurrency; i++) {
+ workloads[i]->Start();
+ }
+
+ SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
+ Release_Store(&should_run_, false);
+
+ sw.stop();
+
+ stop_.Wait();
+ int total_reqs = 0;
+ for (int i = 0; i < concurrency; i++) {
+ total_reqs += workloads[i]->request_count_;
+ }
+
+ SummarizePerf(sw.elapsed(), total_reqs, false);
+}
+
+} // namespace rpc
+} // namespace kudu
+