You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:26 UTC

[10/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_method.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.h b/be/src/kudu/rpc/remote_method.h
new file mode 100644
index 0000000..5b78dad
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REMOTE_METHOD_H_
+#define KUDU_RPC_REMOTE_METHOD_H_
+
+#include <string>
+
+namespace kudu {
+namespace rpc {
+
+class RemoteMethodPB;
+
+// Simple class that acts as a container for a fully qualified remote RPC name
+// and converts to/from RemoteMethodPB.
+// This class is also copyable and assignable for convenience reasons.
+class RemoteMethod {
+ public:
+  RemoteMethod() {}
+  RemoteMethod(std::string service_name, const std::string method_name);
+  std::string service_name() const { return service_name_; }
+  std::string method_name() const { return method_name_; }
+
+  // Encode/decode to/from 'pb'.
+  void FromPB(const RemoteMethodPB& pb);
+  void ToPB(RemoteMethodPB* pb) const;
+
+  std::string ToString() const;
+
+ private:
+  std::string service_name_;
+  std::string method_name_;
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_REMOTE_METHOD_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_user.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.cc b/be/src/kudu/rpc/remote_user.cc
new file mode 100644
index 0000000..50e3fcd
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.cc
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/remote_user.h"
+
+#include <boost/optional.hpp>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+string RemoteUser::ToString() const {
+  string ret;
+  strings::SubstituteAndAppend(&ret, "{username='$0'", username_);
+  if (principal_) {
+    strings::SubstituteAndAppend(&ret, ", principal='$0'", *principal_);
+  }
+  ret.append("}");
+  return ret;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_user.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.h b/be/src/kudu/rpc/remote_user.h
new file mode 100644
index 0000000..7dc0590
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <boost/optional.hpp>
+
+namespace kudu {
+namespace rpc {
+
+// Server-side view of the remote authenticated user.
+//
+// This class may be read by multiple threads concurrently after
+// its initialization during RPC negotiation.
+class RemoteUser {
+ public:
+  // The method by which the remote user authenticated.
+  enum Method {
+    // No authentication (authentication was not required by the server
+    // and the user provided a username but it was not validated in any way)
+    UNAUTHENTICATED,
+    // Kerberos-authenticated.
+    KERBEROS,
+    // Authenticated by a Kudu authentication token.
+    AUTHN_TOKEN,
+    // Authenticated by a client certificate.
+    CLIENT_CERT
+  };
+
+  Method authenticated_by() const {
+    return authenticated_by_;
+  }
+
+  const std::string& username() const { return username_; }
+
+  boost::optional<std::string> principal() const {
+    return principal_;
+  }
+
+  void SetAuthenticatedByKerberos(std::string username,
+                                  std::string principal) {
+    authenticated_by_ = KERBEROS;
+    username_ = std::move(username);
+    principal_ = std::move(principal);
+  }
+
+  void SetUnauthenticated(std::string username) {
+    authenticated_by_ = UNAUTHENTICATED;
+    username_ = std::move(username);
+    principal_ = boost::none;
+  }
+
+  void SetAuthenticatedByClientCert(std::string username,
+                                    boost::optional<std::string> principal) {
+    authenticated_by_ = CLIENT_CERT;
+    username_ = std::move(username);
+    principal_ = std::move(principal);
+  }
+
+  void SetAuthenticatedByToken(std::string username) {
+    authenticated_by_ = AUTHN_TOKEN;
+    username_ = std::move(username);
+    principal_ = boost::none;
+  }
+
+  // Returns a string representation of the object.
+  std::string ToString() const;
+
+ private:
+  // The real username of the remote user. In the case of a Kerberos
+  // principal, this has already been mapped to a local username.
+  // TODO(todd): actually do the above mapping.
+  std::string username_;
+
+  // The full principal of the remote user. This is only set in the
+  // case of a strong-authenticated user.
+  boost::optional<std::string> principal_;
+
+  Method authenticated_by_ = UNAUTHENTICATED;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker-test.cc b/be/src/kudu/rpc/request_tracker-test.cc
new file mode 100644
index 0000000..89ea8a2
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker-test.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <vector>
+
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/util/test_util.h"
+
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+TEST(RequestTrackerTest, TestSequenceNumberGeneration) {
+  const int MAX = 10;
+
+  scoped_refptr<RequestTracker> tracker_(new RequestTracker("test_client"));
+
+  // A new tracker should have no incomplete RPCs
+  RequestTracker::SequenceNumber seq_no = tracker_->FirstIncomplete();
+  ASSERT_EQ(seq_no, RequestTracker::NO_SEQ_NO);
+
+  vector<RequestTracker::SequenceNumber> generated_seq_nos;
+
+  // Generate MAX in flight RPCs, making sure they are correctly returned.
+  for (int i = 0; i < MAX; i++) {
+    ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+    generated_seq_nos.push_back(seq_no);
+  }
+
+  // Now we should get a first incomplete.
+  ASSERT_EQ(generated_seq_nos[0], tracker_->FirstIncomplete());
+
+  // Marking 'first_incomplete' as done, should advance the first incomplete.
+  tracker_->RpcCompleted(tracker_->FirstIncomplete());
+
+  ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+  // Marking a 'middle' rpc, should not advance 'first_incomplete'.
+  tracker_->RpcCompleted(generated_seq_nos[5]);
+  ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+  // Marking half the rpc as complete should advance FirstIncomplete.
+  // Note that this also tests that RequestTracker::RpcCompleted() is idempotent, i.e. that
+  // marking the same sequence number as complete twice is a no-op.
+  for (int i = 0; i < MAX / 2; i++) {
+    tracker_->RpcCompleted(generated_seq_nos[i]);
+  }
+
+  ASSERT_EQ(generated_seq_nos[6], tracker_->FirstIncomplete());
+
+  for (int i = MAX / 2; i <= MAX; i++) {
+    ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+    generated_seq_nos.push_back(seq_no);
+  }
+
+  // Marking them all as completed should cause RequestTracker::FirstIncomplete() to return
+  // Status::NotFound() again.
+  for (auto seq_no : generated_seq_nos) {
+    tracker_->RpcCompleted(seq_no);
+  }
+
+  ASSERT_EQ(tracker_->FirstIncomplete(), RequestTracker::NO_SEQ_NO);
+}
+
+} // namespace rpc
+} // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.cc b/be/src/kudu/rpc/request_tracker.cc
new file mode 100644
index 0000000..1958664
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/request_tracker.h"
+
+#include <mutex>
+
+#include "kudu/gutil/map-util.h"
+
+namespace kudu {
+namespace rpc {
+
+const RequestTracker::SequenceNumber RequestTracker::NO_SEQ_NO = -1;
+
+RequestTracker::RequestTracker(const string& client_id)
+    : client_id_(client_id),
+      next_(0) {}
+
+Status RequestTracker::NewSeqNo(SequenceNumber* seq_no) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  *seq_no = next_;
+  InsertOrDie(&incomplete_rpcs_, *seq_no);
+  next_++;
+  return Status::OK();
+}
+
+RequestTracker::SequenceNumber RequestTracker::FirstIncomplete() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (incomplete_rpcs_.empty()) return NO_SEQ_NO;
+  return *incomplete_rpcs_.begin();
+}
+
+void RequestTracker::RpcCompleted(const SequenceNumber& seq_no) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  incomplete_rpcs_.erase(seq_no);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.h b/be/src/kudu/rpc/request_tracker.h
new file mode 100644
index 0000000..99f8d6c
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.h
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <set>
+#include <string>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+// RequestTracker implementation, inspired by:
+// "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al.
+//
+// This generates sequence numbers for retriable RPCs and tracks the ongoing ones.
+// The main point of this is to enable exactly-once semantics, i.e. making sure that
+// an RPC is only executed once, by uniquely identifying each RPC that is sent to
+// the server.
+//
+// Note that the sequence numbers here are differet from RPC 'call ids'. A call id
+// uniquely identifies a call _to a server_. All calls have a call id that is
+// assigned incrementally. Sequence numbers, on the other hand, uniquely identify
+// the RPC operation itself. That is, if an RPC is retried on another server it will
+// have a different call id, but the same sequence number.
+//
+// By keeping track of the RPCs that are in-flight and which ones are completed
+// we can determine the first incomplete RPC. When this information is sent
+// to the server it can use it to garbage collect RPC results that it might be
+// saving for future retries, since it now knows there won't be any.
+//
+// This class is thread safe.
+class RequestTracker : public RefCountedThreadSafe<RequestTracker> {
+ public:
+  typedef int64_t SequenceNumber;
+  static const RequestTracker::SequenceNumber NO_SEQ_NO;
+  explicit RequestTracker(const std::string& client_id);
+
+  // Creates a new, unique, sequence number.
+  // Sequence numbers are assigned in increasing integer order.
+  // Returns Status::OK() and sets 'seq_no' if it was able to generate a sequence number
+  // or returns Status::ServiceUnavailable() if too many RPCs are in-flight, in which case
+  // the caller should try again later.
+  Status NewSeqNo(SequenceNumber* seq_no);
+
+  // Returns the sequence number of the first incomplete RPC.
+  // If there is no incomplete RPC returns NO_SEQ_NO.
+  SequenceNumber FirstIncomplete();
+
+  // Marks the rpc with 'seq_no' as completed.
+  void RpcCompleted(const SequenceNumber& seq_no);
+
+  // Returns the client id for this request tracker.
+  const std::string& client_id() { return client_id_; }
+ private:
+  // The client id for this request tracker.
+  const std::string client_id_;
+
+  // Lock that protects all non-const fields.
+  simple_spinlock lock_;
+
+  // The next sequence number.
+  SequenceNumber next_;
+
+  // The (ordered) set of incomplete RPCs.
+  std::set<SequenceNumber> incomplete_rpcs_;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/response_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/response_callback.h b/be/src/kudu/rpc/response_callback.h
new file mode 100644
index 0000000..8c4fc03
--- /dev/null
+++ b/be/src/kudu/rpc/response_callback.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_RESPONSE_CALLBACK_H
+#define KUDU_RPC_RESPONSE_CALLBACK_H
+
+#include <boost/function.hpp>
+
+namespace kudu {
+namespace rpc {
+
+typedef boost::function<void()> ResponseCallback;
+
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/result_tracker.cc b/be/src/kudu/rpc/result_tracker.cc
new file mode 100644
index 0000000..11ff8d2
--- /dev/null
+++ b/be/src/kudu/rpc/result_tracker.cc
@@ -0,0 +1,582 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/result_tracker.h"
+
+#include <algorithm>
+#include <limits>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/trace.h"
+
+DEFINE_int64(remember_clients_ttl_ms, 3600 * 1000 /* 1 hour */,
+    "Maximum amount of time, in milliseconds, the server \"remembers\" a client for the "
+    "purpose of caching its responses. After this period without hearing from it, the "
+    "client is no longer remembered and the memory occupied by its responses is reclaimed. "
+    "Retries of requests older than 'remember_clients_ttl_ms' are treated as new "
+    "ones.");
+TAG_FLAG(remember_clients_ttl_ms, advanced);
+
+DEFINE_int64(remember_responses_ttl_ms, 600 * 1000 /* 10 mins */,
+    "Maximum amount of time, in milliseconds, the server \"remembers\" a response to a "
+    "specific request for a client. After this period has elapsed, the response may have "
+    "been garbage collected and the client might get a response indicating the request is "
+    "STALE.");
+TAG_FLAG(remember_responses_ttl_ms, advanced);
+
+DEFINE_int64(result_tracker_gc_interval_ms, 1000,
+    "Interval at which the result tracker will look for entries to GC.");
+TAG_FLAG(result_tracker_gc_interval_ms, hidden);
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::Message;
+using kudu::MemTracker;
+using rpc::InboundCall;
+using std::move;
+using std::lock_guard;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+// This tracks the size changes of anything that has a memory_footprint() method.
+// It must be instantiated before the updates, and it makes sure that the MemTracker
+// is updated on scope exit.
+template <class T>
+struct ScopedMemTrackerUpdater {
+  ScopedMemTrackerUpdater(MemTracker* tracker_, const T* tracked_)
+      : tracker(tracker_),
+        tracked(tracked_),
+        memory_before(tracked->memory_footprint()),
+        cancelled(false) {
+  }
+
+  ~ScopedMemTrackerUpdater() {
+    if (cancelled) return;
+    tracker->Release(memory_before - tracked->memory_footprint());
+  }
+
+  void Cancel() {
+    cancelled = true;
+  }
+
+  MemTracker* tracker;
+  const T* tracked;
+  int64_t memory_before;
+  bool cancelled;
+};
+
+ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker)
+    : mem_tracker_(std::move(mem_tracker)),
+      clients_(ClientStateMap::key_compare(),
+               ClientStateMapAllocator(mem_tracker_)),
+      gc_thread_stop_latch_(1) {}
+
+ResultTracker::~ResultTracker() {
+  if (gc_thread_) {
+    gc_thread_stop_latch_.CountDown();
+    gc_thread_->Join();
+  }
+
+  lock_guard<simple_spinlock> l(lock_);
+  // Release all the memory for the stuff we'll delete on destruction.
+  for (auto& client_state : clients_) {
+    client_state.second->GCCompletionRecords(
+        mem_tracker_, [] (SequenceNumber, CompletionRecord*){ return true; });
+    mem_tracker_->Release(client_state.second->memory_footprint());
+  }
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
+                                                Message* response,
+                                                RpcContext* context) {
+  lock_guard<simple_spinlock> l(lock_);
+  return TrackRpcUnlocked(request_id, response, context);
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& request_id,
+                                                        Message* response,
+                                                        RpcContext* context) {
+  ClientState* client_state = ComputeIfAbsent(
+      &clients_,
+      request_id.client_id(),
+      [&]{
+        unique_ptr<ClientState> client_state(new ClientState(mem_tracker_));
+        mem_tracker_->Consume(client_state->memory_footprint());
+        client_state->stale_before_seq_no = request_id.first_incomplete_seq_no();
+        return client_state;
+      })->get();
+
+  client_state->last_heard_from = MonoTime::Now();
+
+  // If the arriving request is older than our per-client GC watermark, report its
+  // staleness to the client.
+  if (PREDICT_FALSE(request_id.seq_no() < client_state->stale_before_seq_no)) {
+    if (context) {
+      context->call_->RespondFailure(
+          ErrorStatusPB::ERROR_REQUEST_STALE,
+          Status::Incomplete(Substitute("Request with id { $0 } is stale.",
+                                        SecureShortDebugString(request_id))));
+      delete context;
+    }
+    return RpcState::STALE;
+  }
+
+  // GC records according to the client's first incomplete watermark.
+  client_state->GCCompletionRecords(
+      mem_tracker_,
+      [&] (SequenceNumber seq_no, CompletionRecord* completion_record) {
+        return completion_record->state != RpcState::IN_PROGRESS &&
+            seq_no < request_id.first_incomplete_seq_no();
+      });
+
+  auto result = ComputeIfAbsentReturnAbsense(
+      &client_state->completion_records,
+      request_id.seq_no(),
+      [&]{
+        unique_ptr<CompletionRecord> completion_record(new CompletionRecord(
+            RpcState::IN_PROGRESS, request_id.attempt_no()));
+        mem_tracker_->Consume(completion_record->memory_footprint());
+        return completion_record;
+      });
+
+  CompletionRecord* completion_record = result.first->get();
+  ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
+
+  if (PREDICT_TRUE(result.second)) {
+    // When a follower is applying an operation it doesn't have a response yet, and it won't
+    // have a context, so only set them if they exist.
+    if (response != nullptr) {
+      completion_record->ongoing_rpcs.push_back({response,
+                                                 DCHECK_NOTNULL(context),
+                                                 request_id.attempt_no()});
+    }
+    return RpcState::NEW;
+  }
+
+  completion_record->last_updated = MonoTime::Now();
+  switch (completion_record->state) {
+    case RpcState::COMPLETED: {
+      // If the RPC is COMPLETED and the request originates from a client (context, response are
+      // non-null) copy the response and reply immediately. If there is no context/response
+      // do nothing.
+      if (context != nullptr) {
+        DCHECK_NOTNULL(response)->CopyFrom(*completion_record->response);
+        context->call_->RespondSuccess(*response);
+        delete context;
+      }
+      return RpcState::COMPLETED;
+    }
+    case RpcState::IN_PROGRESS: {
+      // If the RPC is IN_PROGRESS check if there is a context and, if so, attach it
+      // so that the rpc gets the same response when the original one completes.
+      if (context != nullptr) {
+        completion_record->ongoing_rpcs.push_back({DCHECK_NOTNULL(response),
+                                                   context,
+                                                   NO_HANDLER});
+      }
+      return RpcState::IN_PROGRESS;
+    }
+    default:
+      LOG(FATAL) << "Wrong state: " << completion_record->state;
+      // dummy return to avoid warnings
+      return RpcState::STALE;
+  }
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpcOrChangeDriver(const RequestIdPB& request_id) {
+  lock_guard<simple_spinlock> l(lock_);
+  RpcState state = TrackRpcUnlocked(request_id, nullptr, nullptr);
+
+  if (state != RpcState::IN_PROGRESS) return state;
+
+  CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+  ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
+
+  // ... if we did find a CompletionRecord change the driver and return true.
+  completion_record->driver_attempt_no = request_id.attempt_no();
+  completion_record->ongoing_rpcs.push_back({nullptr,
+                                             nullptr,
+                                             request_id.attempt_no()});
+
+  // Since we changed the driver of the RPC, return NEW, so that the caller knows
+  // to store the result.
+  return RpcState::NEW;
+}
+
+bool ResultTracker::IsCurrentDriver(const RequestIdPB& request_id) {
+  lock_guard<simple_spinlock> l(lock_);
+  CompletionRecord* completion_record = FindCompletionRecordOrNullUnlocked(request_id);
+
+  // If we couldn't find the CompletionRecord, someone might have called FailAndRespond() so
+  // just return false.
+  if (completion_record == nullptr) return false;
+
+  // ... if we did find a CompletionRecord return true if we're the driver or false
+  // otherwise.
+  return completion_record->driver_attempt_no == request_id.attempt_no();
+}
+
+void ResultTracker::LogAndTraceAndRespondSuccess(RpcContext* context,
+                                                 const Message& msg) {
+  InboundCall* call = context->call_;
+  VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC success "
+      "response for " << call->ToString() << ":" << std::endl << SecureDebugString(msg);
+  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                         "response", pb_util::PbTracer::TracePb(msg),
+                         "trace", context->trace()->DumpToString());
+  call->RespondSuccess(msg);
+  delete context;
+}
+
+void ResultTracker::LogAndTraceFailure(RpcContext* context,
+                                       const Message& msg) {
+  InboundCall* call = context->call_;
+  VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure "
+      "response for " << call->ToString() << ": " << SecureDebugString(msg);
+  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                         "response", pb_util::PbTracer::TracePb(msg),
+                         "trace", context->trace()->DumpToString());
+}
+
+void ResultTracker::LogAndTraceFailure(RpcContext* context,
+                                       ErrorStatusPB_RpcErrorCodePB err,
+                                       const Status& status) {
+  InboundCall* call = context->call_;
+  VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure "
+      "response for " << call->ToString() << ": " << status.ToString();
+  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                         "status", status.ToString(),
+                         "trace", context->trace()->DumpToString());
+}
+
+ResultTracker::CompletionRecord* ResultTracker::FindCompletionRecordOrDieUnlocked(
+    const RequestIdPB& request_id) {
+  ClientState* client_state = DCHECK_NOTNULL(FindPointeeOrNull(clients_, request_id.client_id()));
+  return DCHECK_NOTNULL(FindPointeeOrNull(client_state->completion_records, request_id.seq_no()));
+}
+
+pair<ResultTracker::ClientState*, ResultTracker::CompletionRecord*>
+ResultTracker::FindClientStateAndCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) {
+  ClientState* client_state = FindPointeeOrNull(clients_, request_id.client_id());
+  CompletionRecord* completion_record = nullptr;
+  if (client_state != nullptr) {
+    completion_record = FindPointeeOrNull(client_state->completion_records, request_id.seq_no());
+  }
+  return make_pair(client_state, completion_record);
+}
+
+ResultTracker::CompletionRecord*
+ResultTracker::FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) {
+  return FindClientStateAndCompletionRecordOrNullUnlocked(request_id).second;
+}
+
+void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id,
+                                               const Message* response) {
+  vector<OnGoingRpcInfo> to_respond;
+  {
+    lock_guard<simple_spinlock> l(lock_);
+
+    CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+    ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
+
+    CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
+        << "Called RecordCompletionAndRespond() from an executor identified with an "
+        << "attempt number that was not marked as the driver for the RPC. RequestId: "
+        << SecureShortDebugString(request_id) << "\nTracker state:\n " << ToStringUnlocked();
+    DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS);
+    completion_record->response.reset(DCHECK_NOTNULL(response)->New());
+    completion_record->response->CopyFrom(*response);
+    completion_record->state = RpcState::COMPLETED;
+    completion_record->last_updated = MonoTime::Now();
+
+    CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no());
+
+    int64_t handler_attempt_no = request_id.attempt_no();
+
+    // Go through the ongoing RPCs and reply to each one.
+    for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+         orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+      const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+      if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+        if (ongoing_rpc.context != nullptr) {
+          to_respond.push_back(ongoing_rpc);
+        }
+        ++orpc_iter;
+        orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+            completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+      } else {
+        ++orpc_iter;
+      }
+    }
+  }
+
+  // Respond outside of holding the lock. This reduces lock contention and also
+  // means that we will have fully updated our memory tracking before responding,
+  // which makes testing easier.
+  for (auto& ongoing_rpc : to_respond) {
+    if (PREDICT_FALSE(ongoing_rpc.response != response)) {
+      ongoing_rpc.response->CopyFrom(*response);
+    }
+    LogAndTraceAndRespondSuccess(ongoing_rpc.context, *ongoing_rpc.response);
+  }
+}
+
+void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
+                                           HandleOngoingRpcFunc func) {
+  vector<OnGoingRpcInfo> to_handle;
+  {
+    lock_guard<simple_spinlock> l(lock_);
+    auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
+    if (PREDICT_FALSE(state_and_record.first == nullptr)) {
+      LOG(FATAL) << "Couldn't find ClientState for request: " << SecureShortDebugString(request_id)
+                 << ". \nTracker state:\n" << ToStringUnlocked();
+    }
+
+    CompletionRecord* completion_record = state_and_record.second;
+
+    // It is possible for this method to be called for an RPC that was never actually
+    // tracked (though RecordCompletionAndRespond() can't). One such case is when a
+    // follower transaction fails on the TransactionManager, for some reason, before it
+    // was tracked. The CompletionCallback still calls this method. In this case, do
+    // nothing.
+    if (completion_record == nullptr) {
+      return;
+    }
+
+    ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
+    completion_record->last_updated = MonoTime::Now();
+
+    int64_t seq_no = request_id.seq_no();
+    int64_t handler_attempt_no = request_id.attempt_no();
+
+    // If we're copying from a client originated response we need to take care to reply
+    // to that call last, otherwise we'll lose 'response', before we go through all the
+    // CompletionRecords.
+    for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+         orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+      const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+      if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+        to_handle.push_back(ongoing_rpc);
+        ++orpc_iter;
+        orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+            completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+      } else {
+        ++orpc_iter;
+      }
+    }
+
+    // If we're the last ones trying this and the state is not completed,
+    // delete the completion record.
+    if (completion_record->ongoing_rpcs.size() == 0
+        && completion_record->state != RpcState::COMPLETED) {
+      cr_updater.Cancel();
+      unique_ptr<CompletionRecord> completion_record =
+          EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no);
+      mem_tracker_->Release(completion_record->memory_footprint());
+    }
+  }
+
+  // Wait until outside the lock to do the heavy-weight work.
+  for (auto& ongoing_rpc : to_handle) {
+    if (ongoing_rpc.context != nullptr) {
+      func(ongoing_rpc);
+      delete ongoing_rpc.context;
+    }
+  }
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id, Message* response) {
+  auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+    // In the common case RPCs are just executed once so, in that case, avoid an extra
+    // copy of the response.
+    if (PREDICT_FALSE(ongoing_rpc.response != response)) {
+      ongoing_rpc.response->CopyFrom(*response);
+    }
+    LogAndTraceFailure(ongoing_rpc.context, *response);
+    ongoing_rpc.context->call_->RespondSuccess(*response);
+  };
+  FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
+                                   ErrorStatusPB_RpcErrorCodePB err, const Status& status) {
+  auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+    LogAndTraceFailure(ongoing_rpc.context, err, status);
+    ongoing_rpc.context->call_->RespondFailure(err, status);
+  };
+  FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
+                                   int error_ext_id, const string& message,
+                                   const Message& app_error_pb) {
+  auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+    LogAndTraceFailure(ongoing_rpc.context, app_error_pb);
+    ongoing_rpc.context->call_->RespondApplicationError(error_ext_id, message, app_error_pb);
+  };
+  FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::StartGCThread() {
+  CHECK(!gc_thread_);
+  CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread,
+                          this, &gc_thread_));
+}
+
+void ResultTracker::RunGCThread() {
+  while (!gc_thread_stop_latch_.WaitFor(MonoDelta::FromMilliseconds(
+             FLAGS_result_tracker_gc_interval_ms))) {
+    GCResults();
+  }
+}
+
+void ResultTracker::GCResults() {
+  lock_guard<simple_spinlock> l(lock_);
+  MonoTime now = MonoTime::Now();
+  // Calculate the instants before which we'll start GCing ClientStates and CompletionRecords.
+  MonoTime time_to_gc_clients_from = now;
+  time_to_gc_clients_from.AddDelta(
+      MonoDelta::FromMilliseconds(-FLAGS_remember_clients_ttl_ms));
+  MonoTime time_to_gc_responses_from = now;
+  time_to_gc_responses_from.AddDelta(
+      MonoDelta::FromMilliseconds(-FLAGS_remember_responses_ttl_ms));
+
+  // Now go through the ClientStates. If we haven't heard from a client in a while
+  // GC it and all its completion records (making sure there isn't actually one in progress first).
+  // If we've heard from a client recently, but some of its responses are old, GC those responses.
+  for (auto iter = clients_.begin(); iter != clients_.end();) {
+    auto& client_state = iter->second;
+    if (client_state->last_heard_from < time_to_gc_clients_from) {
+      // Client should be GCed.
+      bool ongoing_request = false;
+      client_state->GCCompletionRecords(
+          mem_tracker_,
+          [&] (SequenceNumber, CompletionRecord* completion_record) {
+            if (PREDICT_FALSE(completion_record->state == RpcState::IN_PROGRESS)) {
+              ongoing_request = true;
+              return false;
+            }
+            return true;
+          });
+      // Don't delete the client state if there is still a request in execution.
+      if (PREDICT_FALSE(ongoing_request)) {
+        ++iter;
+        continue;
+      }
+      mem_tracker_->Release(client_state->memory_footprint());
+      iter = clients_.erase(iter);
+    } else {
+      // Client can't be GCed, but its calls might be GCable.
+      iter->second->GCCompletionRecords(
+          mem_tracker_,
+          [&] (SequenceNumber, CompletionRecord* completion_record) {
+            return completion_record->state != RpcState::IN_PROGRESS &&
+                completion_record->last_updated < time_to_gc_responses_from;
+          });
+      ++iter;
+    }
+  }
+}
+
+string ResultTracker::ToString() {
+  lock_guard<simple_spinlock> l(lock_);
+  return ToStringUnlocked();
+}
+
+string ResultTracker::ToStringUnlocked() const {
+  string result = Substitute("ResultTracker[this: $0, Num. Client States: $1, Client States:\n",
+                             this, clients_.size());
+  for (auto& cs : clients_) {
+    SubstituteAndAppend(&result, Substitute("\n\tClient: $0, $1", cs.first, cs.second->ToString()));
+  }
+  result.append("]");
+  return result;
+}
+
+template<class MustGcRecordFunc>
+void ResultTracker::ClientState::GCCompletionRecords(
+    const shared_ptr<kudu::MemTracker>& mem_tracker,
+    MustGcRecordFunc must_gc_record_func) {
+  ScopedMemTrackerUpdater<ClientState> updater(mem_tracker.get(), this);
+  for (auto iter = completion_records.begin(); iter != completion_records.end();) {
+    if (must_gc_record_func(iter->first, iter->second.get())) {
+      mem_tracker->Release(iter->second->memory_footprint());
+      SequenceNumber deleted_seq_no = iter->first;
+      iter = completion_records.erase(iter);
+      // Each time we GC a response, update 'stale_before_seq_no'.
+      // This will allow to answer clients that their responses are stale if we get
+      // a request with a sequence number lower than or equal to this one.
+      stale_before_seq_no = std::max(deleted_seq_no + 1, stale_before_seq_no);
+      continue;
+    }
+    // Since we store completion records in order, if we found one that shouldn't be GCed,
+    // don't GC anything after it.
+    return;
+  }
+}
+
+string ResultTracker::ClientState::ToString() const {
+  auto since_last_heard =
+      MonoTime::Now().GetDeltaSince(last_heard_from);
+  string result = Substitute("Client State[Last heard from: $0s ago, "
+                             "$1 CompletionRecords:",
+                             since_last_heard.ToString(),
+                             completion_records.size());
+  for (auto& completion_record : completion_records) {
+    SubstituteAndAppend(&result, Substitute("\n\tCompletion Record: $0, $1",
+                                            completion_record.first,
+                                            completion_record.second->ToString()));
+  }
+  result.append("\t]");
+  return result;
+}
+
+string ResultTracker::CompletionRecord::ToString() const {
+  string result = Substitute("Completion Record[State: $0, Driver: $1, "
+                             "Cached response: $2, $3 OngoingRpcs:",
+                             state,
+                             driver_attempt_no,
+                             response ? SecureShortDebugString(*response) : "None",
+                             ongoing_rpcs.size());
+  for (auto& orpc : ongoing_rpcs) {
+    SubstituteAndAppend(&result, Substitute("\n\t$0", orpc.ToString()));
+  }
+  result.append("\t\t]");
+  return result;
+}
+
+string ResultTracker::OnGoingRpcInfo::ToString() const {
+  return Substitute("OngoingRpc[Handler: $0, Context: $1, Response: $2]",
+                    handler_attempt_no, context,
+                    response ? SecureShortDebugString(*response) : "NULL");
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/result_tracker.h b/be/src/kudu/rpc/result_tracker.h
new file mode 100644
index 0000000..f629d7a
--- /dev/null
+++ b/be/src/kudu/rpc/result_tracker.h
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <functional>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/thread.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // protobuf
+} // google
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+
+// A ResultTracker for RPC results.
+//
+// The ResultTracker is responsible for tracking the results of RPCs and making sure that
+// client calls with the same client ID and sequence number (first attempt and subsequent retries)
+// are executed exactly once.
+//
+// In most cases, the use of ResultTracker is internal to the RPC system: RPCs are tracked when
+// they first arrive, before service methods are called, and calls to ResultTracker to store
+// responses are performed internally by RpcContext. The exception is when an RPC is replicated
+// across multiple servers, such as with writes, in which case direct interaction with the result
+// tracker is required so as to cache responses on replicas which did not receive the RPC directly
+// from the client.
+//
+// Throughout this header and elsewhere we use the following terms:
+//
+// RPC - The operation that a client or another server wants to execute on this server. The client
+//       might attempt one RPC many times, for instance if failures or timeouts happen.
+// Attempt - Each individual attempt of an RPC on the server.
+// Handler - A thread executing an attempt. Usually there is only one handler that executes the
+//           first attempt of an RPC and, when it completes, replies to its own attempt and to all
+//           other attempts that might have arrived after it started.
+// Driver - Only important in cases where there might be multiple handlers (e.g. in replicated
+//          RPCs). In these cases there might be two handlers executing the same RPC, corresponding
+//          to different attempts. Since the RPC must be executed exactly once, only one of the
+//          handlers must be selected as the "driver" and actually perform the operation.
+//
+// If a client wishes to track the result of a given RPC it must send on the RPC header
+// a RequestId with the following information:
+//
+//       Client ID - Uniquely identifies a single client. All the RPCs originating from the same
+//                   client must have the same ID.
+// Sequence number - Uniquely identifies a single RPC, even across retries to multiple servers, for
+//                   replicated RPCs. All retries of the same RPC must have the same sequence
+//                   number.
+//  Attempt number - Uniquely identifies each retry of the same RPC. All retries of the same RPC
+//                   must have different attempt numbers.
+//
+// When a call first arrives from the client the RPC subsystem will call TrackRpc() which
+// will return the state of the RPC in the form of an RpcState enum.
+//
+// If the ResultTracker returns NEW, this signals that it's the first time the server has heard
+// of the RPC and that the corresponding server function should be executed.
+//
+// If anything other than NEW is returned it means that the call has either previously completed or
+// is in the process of being executed. In this case the caller should _not_ execute the function
+// corresponding to the RPC. The ResultTracker itself will take care of responding to the client
+// appropriately. If the RPC was already completed, the ResultTracker replies to the client
+// immediately. If the RPC is still ongoing, the attempt gets "attached" to the ongoing one and will
+// receive the same response when its handler finishes.
+//
+// If handling of the RPC is successful, RecordCompletionAndRespond() must be called
+// to register successful completion, in which case all pending or future RPCs with the same
+// sequence number, from the same client, will receive the same response.
+//
+// On the other hand, if execution of the server function is not successful then one of
+// the FailAndRespond() methods should be called, causing all _pending_ attempts to receive the same
+// error. However this error is not stored, any future attempt with the same sequence number and
+// same client ID will be given a new chance to execute, as if it it had never been tried before.
+// This gives the client a chance to either retry (if the failure reason is transient) or give up.
+//
+// ============================================================================
+// RPCs with multiple handlers
+// ============================================================================
+//
+// Some RPCs results are tracked by single server, i.e. they correspond to the modification of an
+// unreplicated resource and are unpersisted. For those no additional care needs to be taken, the
+// first attempt will be the only handler, and subsequent attempts will receive the response when
+// that first attempt is done.
+// However some RPCs are replicated across servers, using consensus, and thus can have multiple
+// handlers executing different attempts at the same time, e.g. one handler from a client
+// originating retry, and one from a previous leader originating update.
+//
+// In this case we need to make sure that the following invariants are enforced:
+// - Only one handler can actually record a response, the "driver" handler.
+// - Only one handler must respond to "attached" attempts.
+// - Each handler replies to their own RPCs, to avoid races. That is, a live handler should
+//   not mutate another live handler's response/context.
+//
+// This is achieved by naming one handler the "driver" of the RPC and making sure that only
+// the driver can successfully complete it, i.e. call RecordCompletionAndRespond().
+//
+// In order to make sure there is only one driver, there must be an _external_ serialization
+// point, before the final response is produced, after which only one of the handlers will
+// be marked as the driver. For instance, for writes, this serialization point is in
+// TransactionDriver, in a synchronized block where a logic such as this one happens (here
+// in pseudo-ish code):
+//
+// {
+//   lock_guard<simple_spinlock> l(lock_);
+//   if (follower_transaction) {
+//     result_tracker_->TrackRpcOrChangeDriver(request_id);
+//     continue_with_transaction();
+//   } else if (client_transaction) {
+//     bool is_still_driver = result_tracker_->IsCurrentDriver(request_id);
+//     if (is_still_driver) continue_with_transaction();
+//     else abort_transaction();
+//   }
+// }
+//
+// This class is thread safe.
+class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
+ public:
+  typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
+  static const int NO_HANDLER = -1;
+  // Enum returned by TrackRpc that reflects the state of the RPC.
+  enum RpcState {
+    // The RPC is new.
+    NEW,
+    // The RPC has previously completed and the same response has been sent
+    // to the client.
+    COMPLETED,
+    // The RPC is currently in-progress and, when it completes, the same response
+    // will be sent to the client.
+    IN_PROGRESS,
+    // The RPC's state is stale, meaning it's older than our per-client garbage
+    // collection watermark and we do not recall the original response.
+    STALE
+  };
+
+  explicit ResultTracker(std::shared_ptr<kudu::MemTracker> mem_tracker);
+  ~ResultTracker();
+
+  // Tracks the RPC and returns its current state.
+  //
+  // If the RpcState == NEW the caller is supposed to actually start executing the RPC.
+  // The caller still owns the passed 'response' and 'context'.
+  //
+  // If the RpcState is anything else all remaining actions will be taken care of internally,
+  // i.e. the caller no longer needs to execute the RPC and this takes ownership of the passed
+  // 'response' and 'context'.
+  RpcState TrackRpc(const RequestIdPB& request_id,
+                    google::protobuf::Message* response,
+                    RpcContext* context);
+
+  // Used to track RPC attempts which originate from other replicas, and which may race with
+  // client originated ones.
+  // Tracks the RPC if it is untracked or changes the current driver of this RPC, i.e. sets the
+  // attempt number in 'request_id' as the driver of the RPC, if it is tracked and IN_PROGRESS.
+  RpcState TrackRpcOrChangeDriver(const RequestIdPB& request_id);
+
+  // Checks if the attempt at an RPC identified by 'request_id' is the current driver of the
+  // RPC. That is, if the attempt number in 'request_id' corresponds to the attempt marked
+  // as the driver of this RPC, either by initially getting NEW from TrackRpc() or by
+  // explicit driver change with ChangeDriver().
+  bool IsCurrentDriver(const RequestIdPB& request_id);
+
+  // Records the completion of sucessful operation.
+  // This will respond to all RPCs from the same client with the same sequence_number.
+  // The response will be stored so that any future retries of this RPC get the same response.
+  //
+  // Requires that TrackRpc() was called before with the same 'client_id' and
+  // 'sequence_number'.
+  // Requires that the attempt indentified by 'request_id' is the current driver
+  // of the RPC.
+  void RecordCompletionAndRespond(const RequestIdPB& request_id,
+                                  const google::protobuf::Message* response);
+
+  // Responds to all RPCs identified by 'client_id' and 'sequence_number' with the same response,
+  // but doesn't actually store the response.
+  // This should be called when the RPC failed validation or if some transient error occurred.
+  // Based on the response the client can then decide whether to retry the RPC (which will
+  // be treated as a new one) or to give up.
+  //
+  // Requires that TrackRpc() was called before with the same 'client_id' and
+  // 'sequence_number'.
+  // Requires that the attempt indentified by 'request_id' is the current driver
+  // of the RPC.
+  void FailAndRespond(const RequestIdPB& request_id,
+                      google::protobuf::Message* response);
+
+  // Overload to match other types of RpcContext::Respond*Failure()
+  void FailAndRespond(const RequestIdPB& request_id,
+                      ErrorStatusPB_RpcErrorCodePB err, const Status& status);
+
+  // Overload to match other types of RpcContext::Respond*Failure()
+  void FailAndRespond(const RequestIdPB& request_id,
+                      int error_ext_id, const std::string& message,
+                      const google::protobuf::Message& app_error_pb);
+
+  // Start a background thread which periodically runs GCResults().
+  // This thread is automatically stopped in the destructor.
+  //
+  // Must be called at most once.
+  void StartGCThread();
+
+  // Runs time-based garbage collection on the results this result tracker is caching.
+  // When garbage collection runs, it goes through all ClientStates and:
+  // - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no
+  //   requests are in progress, GCs the ClientState and all its CompletionRecords.
+  // - If a ClientState is newer than the 'remember_clients_ttl_ms' flag, goes
+  //   through all CompletionRecords and:
+  //   - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag,
+  //     GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark.
+  //
+  // Typically this is invoked from an internal thread started by 'StartGCThread()'.
+  void GCResults();
+
+  string ToString();
+
+ private:
+  // Information about client originated ongoing RPCs.
+  // The lifecycle of 'response' and 'context' is managed by the RPC layer.
+  struct OnGoingRpcInfo {
+    google::protobuf::Message* response;
+    RpcContext* context;
+    int64_t handler_attempt_no;
+
+    std::string ToString() const;
+  };
+  // A completion record for an IN_PROGRESS or COMPLETED RPC.
+  struct CompletionRecord {
+    CompletionRecord(RpcState state, int64_t driver_attempt_no)
+        : state(state),
+          driver_attempt_no(driver_attempt_no),
+          last_updated(MonoTime::Now()) {
+    }
+
+    // The current state of the RPC.
+    RpcState state;
+
+    // The attempt number that is/was "driving" this RPC.
+    int64_t driver_attempt_no;
+
+    // The timestamp of the last CompletionRecord update.
+    MonoTime last_updated;
+
+    // The cached response, if this RPC is in COMPLETED state.
+    std::unique_ptr<google::protobuf::Message> response;
+
+    // The set of ongoing RPCs that correspond to this record.
+    std::vector<OnGoingRpcInfo> ongoing_rpcs;
+
+    std::string ToString() const;
+
+    // Calculates the memory footprint of this struct.
+    int64_t memory_footprint() const {
+      return kudu_malloc_usable_size(this)
+          + (ongoing_rpcs.capacity() > 0 ? kudu_malloc_usable_size(ongoing_rpcs.data()) : 0)
+          + (response.get() != nullptr ? response->SpaceUsed() : 0);
+    }
+  };
+
+  // The state corresponding to a single client.
+  struct ClientState {
+    typedef MemTrackerAllocator<
+        std::pair<const SequenceNumber,
+                  std::unique_ptr<CompletionRecord>>> CompletionRecordMapAllocator;
+    typedef std::map<SequenceNumber,
+                     std::unique_ptr<CompletionRecord>,
+                     std::less<SequenceNumber>,
+                     CompletionRecordMapAllocator> CompletionRecordMap;
+
+    explicit ClientState(std::shared_ptr<MemTracker> mem_tracker)
+        : stale_before_seq_no(0),
+          completion_records(CompletionRecordMap::key_compare(),
+                             CompletionRecordMapAllocator(std::move(mem_tracker))) {}
+
+    // The last time we've heard from this client.
+    MonoTime last_heard_from;
+
+    // The sequence number of the first response we remember for this client.
+    // All sequence numbers before this one are considered STALE.
+    SequenceNumber stale_before_seq_no;
+
+    // The (un gc'd) CompletionRecords for this client.
+    CompletionRecordMap completion_records;
+
+    // Garbage collects this client's CompletionRecords for which MustGcRecordFunc returns
+    // true. We use a lambda here so that we can have a single method that GCs and releases
+    // the memory for CompletionRecords based on different policies.
+    //
+    // 'func' should have the following signature:
+    //   bool MyFunction(SequenceNumber seq_no, CompletionRecord* record);
+    //
+    template<class MustGcRecordFunc>
+    void GCCompletionRecords(const std::shared_ptr<kudu::MemTracker>& mem_tracker,
+                             MustGcRecordFunc func);
+
+    std::string ToString() const;
+
+    // Calculates the memory footprint of this struct.
+    // This calculation is shallow and doesn't account for the memory the nested data
+    // structures occupy.
+    int64_t memory_footprint() const {
+      return kudu_malloc_usable_size(this);
+    }
+  };
+
+  RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
+                            google::protobuf::Message* response,
+                            RpcContext* context);
+
+  typedef std::function<void (const OnGoingRpcInfo&)> HandleOngoingRpcFunc;
+
+  // Helper method to handle the multiple overloads of FailAndRespond. Takes a lambda
+  // that knows what to do with OnGoingRpcInfo in each individual case.
+  void FailAndRespondInternal(const rpc::RequestIdPB& request_id,
+                              HandleOngoingRpcFunc func);
+
+  CompletionRecord* FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id);
+  CompletionRecord* FindCompletionRecordOrDieUnlocked(const RequestIdPB& request_id);
+  std::pair<ClientState*, CompletionRecord*> FindClientStateAndCompletionRecordOrNullUnlocked(
+      const RequestIdPB& request_id);
+
+  // A handler must handle an RPC attempt if:
+  // 1 - It's its own attempt. I.e. it has the same attempt number of the handler.
+  // 2 - It's the driver of the RPC and the attempt has no handler (was attached).
+  bool MustHandleRpc(int64_t handler_attempt_no,
+                     CompletionRecord* completion_record,
+                     const OnGoingRpcInfo& ongoing_rpc) {
+    if (PREDICT_TRUE(ongoing_rpc.handler_attempt_no == handler_attempt_no)) {
+      return true;
+    }
+    if (completion_record->driver_attempt_no == handler_attempt_no) {
+      return ongoing_rpc.handler_attempt_no == NO_HANDLER;
+    }
+    return false;
+  }
+
+  void LogAndTraceAndRespondSuccess(RpcContext* context, const google::protobuf::Message& msg);
+  void LogAndTraceFailure(RpcContext* context, const google::protobuf::Message& msg);
+  void LogAndTraceFailure(RpcContext* context, ErrorStatusPB_RpcErrorCodePB err,
+                          const Status& status);
+
+  std::string ToStringUnlocked() const;
+
+  void RunGCThread();
+
+  // The memory tracker that tracks this ResultTracker's memory consumption.
+  std::shared_ptr<kudu::MemTracker> mem_tracker_;
+
+  // Lock that protects access to 'clients_' and to the state contained in each
+  // ClientState.
+  // TODO consider a per-ClientState lock if we find this too coarse grained.
+  simple_spinlock lock_;
+
+  typedef MemTrackerAllocator<std::pair<const std::string,
+                                        std::unique_ptr<ClientState>>> ClientStateMapAllocator;
+  typedef std::map<std::string,
+                   std::unique_ptr<ClientState>,
+                   std::less<std::string>,
+                   ClientStateMapAllocator> ClientStateMap;
+
+  ClientStateMap clients_;
+
+  // The thread which runs GC, and a latch to stop it.
+  scoped_refptr<Thread> gc_thread_;
+  CountDownLatch gc_thread_stop_latch_;
+
+  DISALLOW_COPY_AND_ASSIGN(ResultTracker);
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/retriable_rpc.h b/be/src/kudu/rpc/retriable_rpc.h
new file mode 100644
index 0000000..c896027
--- /dev/null
+++ b/be/src/kudu/rpc/retriable_rpc.h
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+namespace rpc {
+
+namespace internal {
+typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
+}
+
+// A base class for retriable RPCs that handles replica picking and retry logic.
+//
+// The 'Server' template parameter refers to the type of the server that will be looked up
+// and passed to the derived classes on Try(). For instance in the case of WriteRpc it's
+// RemoteTabletServer.
+//
+// TODO(unknown): merge RpcRetrier into this class? Can't be done right now as the retrier is used
+// independently elsewhere, but likely possible when all replicated RPCs have a ReplicaPicker.
+//
+// TODO(unknown): allow to target replicas other than the leader, if needed.
+//
+// TODO(unknown): once we have retry handling on all the RPCs merge this with rpc::Rpc.
+template <class Server, class RequestPB, class ResponsePB>
+class RetriableRpc : public Rpc {
+ public:
+  RetriableRpc(const scoped_refptr<ServerPicker<Server>>& server_picker,
+               const scoped_refptr<RequestTracker>& request_tracker,
+               const MonoTime& deadline,
+               std::shared_ptr<Messenger> messenger)
+      : Rpc(deadline, std::move(messenger)),
+        server_picker_(server_picker),
+        request_tracker_(request_tracker),
+        sequence_number_(RequestTracker::NO_SEQ_NO),
+        num_attempts_(0) {}
+
+  virtual ~RetriableRpc() {
+    DCHECK_EQ(sequence_number_, RequestTracker::NO_SEQ_NO);
+  }
+
+  // Performs server lookup/initialization.
+  // If/when the server is looked up and initialized successfully RetriableRpc will call
+  // Try() to actually send the request.
+  void SendRpc() override;
+
+  // The callback to call upon retrieving (of failing to retrieve) a new authn
+  // token. This is the callback that subclasses should call in their custom
+  // implementation of the GetNewAuthnTokenAndRetry() method.
+  void GetNewAuthnTokenAndRetryCb(const Status& status);
+
+ protected:
+  // Subclasses implement this method to actually try the RPC.
+  // The server been looked up and is ready to be used.
+  virtual void Try(Server* replica, const ResponseCallback& callback) = 0;
+
+  // Subclasses implement this method to analyze 'status', the controller status or
+  // the response and return a RetriableRpcStatus which will then be used
+  // to decide how to proceed (retry or give up).
+  virtual RetriableRpcStatus AnalyzeResponse(const Status& status) = 0;
+
+  // Subclasses implement this method to perform cleanup and/or final steps.
+  // After this is called the RPC will be no longer retried.
+  virtual void Finish(const Status& status) = 0;
+
+  // Returns 'true' if the RPC is to scheduled for retry with a new authn token,
+  // 'false' otherwise. For RPCs performed in the context of providing token
+  // for authentication it's necessary to implement this method. The default
+  // implementation returns 'false' meaning the calls returning
+  // INVALID_AUTHENTICATION_TOKEN RPC status are not retried.
+  virtual bool GetNewAuthnTokenAndRetry() {
+    return false;
+  }
+
+  // Request body.
+  RequestPB req_;
+
+  // Response body.
+  ResponsePB resp_;
+
+ private:
+  friend class CalculatorServiceRpc;
+
+  // Decides whether to retry the RPC, based on the result of AnalyzeResponse()
+  // and retries if that is the case.
+  // Returns true if the RPC was retried or false otherwise.
+  bool RetryIfNeeded(const RetriableRpcStatus& result, Server* server);
+
+  // Called when the replica has been looked up.
+  void ReplicaFoundCb(const Status& status, Server* server);
+
+  // Called after the RPC was performed.
+  void SendRpcCb(const Status& status) override;
+
+  // Performs final cleanup, after the RPC is done (independently of success).
+  void FinishInternal();
+
+  scoped_refptr<ServerPicker<Server>> server_picker_;
+  scoped_refptr<RequestTracker> request_tracker_;
+  std::shared_ptr<Messenger> messenger_;
+
+  // The sequence number for this RPC.
+  internal::SequenceNumber sequence_number_;
+
+  // The number of times this RPC has been attempted
+  int32 num_attempts_;
+
+  // Keeps track of the replica the RPCs were sent to.
+  // TODO Remove this and pass the used replica around. For now we need to keep this as
+  // the retrier calls the SendRpcCb directly and doesn't know the replica that was
+  // being written to.
+  Server* current_;
+};
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpc()  {
+  if (sequence_number_ == RequestTracker::NO_SEQ_NO) {
+    CHECK_OK(request_tracker_->NewSeqNo(&sequence_number_));
+  }
+  server_picker_->PickLeader(Bind(&RetriableRpc::ReplicaFoundCb,
+                                  Unretained(this)),
+                             retrier().deadline());
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::GetNewAuthnTokenAndRetryCb(
+    const Status& status) {
+  if (status.ok()) {
+    // Perform the RPC call with the newly fetched authn token.
+    mutable_retrier()->mutable_controller()->Reset();
+    SendRpc();
+  } else {
+    // Back to the retry sequence, hoping for better conditions after some time.
+    VLOG(1) << "Failed to get new authn token: " << status.ToString();
+    mutable_retrier()->DelayedRetry(this, status);
+  }
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(
+    const RetriableRpcStatus& result, Server* server) {
+  // Handle the cases where we retry.
+  switch (result.result) {
+    case RetriableRpcStatus::SERVICE_UNAVAILABLE:
+      // For writes, always retry the request on the same server in case of the
+      // SERVICE_UNAVAILABLE error.
+      break;
+
+    case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE:
+      // TODO(KUDU-1745): not checking for null here results in a crash, since in the case
+      // of a failed master lookup we have no tablet server corresponding to the error.
+      //
+      // But, with the null check, we end up with a relatively tight retry loop
+      // in this scenario whereas we should be backing off. Need to improve
+      // test coverage here to understand why the back-off is not taking effect.
+      if (server != nullptr) {
+        VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString();
+        // Mark the server as failed. As for details on the only existing
+        // implementation of ServerPicker::MarkServerFailed(), see the note on
+        // the MetaCacheServerPicker::MarkServerFailed() method.
+        server_picker_->MarkServerFailed(server, result.status);
+      }
+      break;
+
+    case RetriableRpcStatus::RESOURCE_NOT_FOUND:
+      // The TabletServer was not part of the config serving the tablet.
+      // We mark our tablet cache as stale, forcing a master lookup on the
+      // next attempt.
+      //
+      // TODO(KUDU-1314): Don't backoff the first time we hit this error.
+      server_picker_->MarkResourceNotFound(server);
+      break;
+
+    case RetriableRpcStatus::REPLICA_NOT_LEADER:
+      // The TabletServer was not the leader of the quorum.
+      server_picker_->MarkReplicaNotLeader(server);
+      break;
+
+    case RetriableRpcStatus::INVALID_AUTHENTICATION_TOKEN: {
+      // This is a special case for retry: first it's necessary to get a new
+      // authn token and then retry the operation with the new token.
+      if (GetNewAuthnTokenAndRetry()) {
+        // The RPC will be retried.
+        resp_.Clear();
+        return true;
+      }
+      // Do not retry.
+      return false;
+    }
+
+    case RetriableRpcStatus::NON_RETRIABLE_ERROR:
+      if (server != nullptr && result.status.IsTimedOut()) {
+        // For the NON_RETRIABLE_ERROR result in case of TimedOut status,
+        // mark the server as failed. As for details on the only existing
+        // implementation of ServerPicker::MarkServerFailed(), see the note on
+        // the MetaCacheServerPicker::MarkServerFailed() method.
+        VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString();
+        server_picker_->MarkServerFailed(server, result.status);
+      }
+      // Do not retry in the case of non-retriable error.
+      return false;
+
+    default:
+      // For the OK case we should not retry.
+      DCHECK(result.result == RetriableRpcStatus::OK);
+      return false;
+  }
+  resp_.Clear();
+  current_ = nullptr;
+  mutable_retrier()->DelayedRetry(this, result.status);
+  return true;
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::FinishInternal() {
+  // Mark the RPC as completed and set the sequence number to NO_SEQ_NO to make
+  // sure we're in the appropriate state before destruction.
+  request_tracker_->RpcCompleted(sequence_number_);
+  sequence_number_ = RequestTracker::NO_SEQ_NO;
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::ReplicaFoundCb(const Status& status,
+                                                                 Server* server) {
+  // NOTE: 'server' here may be nullptr in the case that status is not OK!
+  RetriableRpcStatus result = AnalyzeResponse(status);
+  if (RetryIfNeeded(result, server)) return;
+
+  if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) {
+    FinishInternal();
+    Finish(result.status);
+    return;
+  }
+
+  // We successfully found a replica, so prepare the RequestIdPB before we send out the call.
+  std::unique_ptr<RequestIdPB> request_id(new RequestIdPB());
+  request_id->set_client_id(request_tracker_->client_id());
+  request_id->set_seq_no(sequence_number_);
+  request_id->set_first_incomplete_seq_no(request_tracker_->FirstIncomplete());
+  request_id->set_attempt_no(num_attempts_++);
+
+  mutable_retrier()->mutable_controller()->SetRequestIdPB(std::move(request_id));
+
+  DCHECK_EQ(result.result, RetriableRpcStatus::OK);
+  current_ = server;
+  Try(server, boost::bind(&RetriableRpc::SendRpcCb, this, Status::OK()));
+}
+
+template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpcCb(const Status& status) {
+  RetriableRpcStatus result = AnalyzeResponse(status);
+  if (RetryIfNeeded(result, current_)) return;
+
+  FinishInternal();
+
+  // From here on out the RPC has either succeeded of suffered a non-retriable
+  // failure.
+  Status final_status = result.status;
+  if (!final_status.ok()) {
+    string error_string;
+    if (current_) {
+      error_string = strings::Substitute("Failed to write to server: $0", current_->ToString());
+    } else {
+      error_string = "Failed to write to server: (no server available)";
+    }
+    final_status = final_status.CloneAndPrepend(error_string);
+  }
+  Finish(final_status);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc-bench.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-bench.cc b/be/src/kudu/rpc/rpc-bench.cc
new file mode 100644
index 0000000..d569ea1
--- /dev/null
+++ b/be/src/kudu/rpc/rpc-bench.cc
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rtest.proxy.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/test_util.h"
+
+using std::bind;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+DEFINE_int32(client_threads, 16,
+             "Number of client threads. For the synchronous benchmark, each thread has "
+             "a single outstanding synchronous request at a time. For the async "
+             "benchmark, this determines the number of client reactors.");
+
+DEFINE_int32(async_call_concurrency, 60,
+             "Number of concurrent requests that will be outstanding at a time for the "
+             "async benchmark. The requests are multiplexed across the number of "
+             "reactors specified by the 'client_threads' flag.");
+
+DEFINE_int32(worker_threads, 1,
+             "Number of server worker threads");
+
+DEFINE_int32(server_reactors, 4,
+             "Number of server reactor threads");
+
+DEFINE_int32(run_seconds, 1, "Seconds to run the test");
+
+DECLARE_bool(rpc_encrypt_loopback_connections);
+DEFINE_bool(enable_encryption, false, "Whether to enable TLS encryption for rpc-bench");
+
+namespace kudu {
+namespace rpc {
+
+class RpcBench : public RpcTestBase {
+ public:
+  RpcBench()
+      : should_run_(true),
+        stop_(0)
+  {}
+
+  void SetUp() override {
+    OverrideFlagForSlowTests("run_seconds", "10");
+
+    n_worker_threads_ = FLAGS_worker_threads;
+    n_server_reactor_threads_ = FLAGS_server_reactors;
+
+    // Set up server.
+    FLAGS_rpc_encrypt_loopback_connections = FLAGS_enable_encryption;
+    StartTestServerWithGeneratedCode(&server_addr_, FLAGS_enable_encryption);
+  }
+
+  void SummarizePerf(CpuTimes elapsed, int total_reqs, bool sync) {
+    float reqs_per_second = static_cast<float>(total_reqs / elapsed.wall_seconds());
+    float user_cpu_micros_per_req = static_cast<float>(elapsed.user / 1000.0 / total_reqs);
+    float sys_cpu_micros_per_req = static_cast<float>(elapsed.system / 1000.0 / total_reqs);
+    float csw_per_req = static_cast<float>(elapsed.context_switches) / total_reqs;
+
+    LOG(INFO) << "Mode:            " << (sync ? "Sync" : "Async");
+    if (sync) {
+      LOG(INFO) << "Client threads:   " << FLAGS_client_threads;
+    } else {
+      LOG(INFO) << "Client reactors:  " << FLAGS_client_threads;
+      LOG(INFO) << "Call concurrency: " << FLAGS_async_call_concurrency;
+    }
+
+    LOG(INFO) << "Worker threads:   " << FLAGS_worker_threads;
+    LOG(INFO) << "Server reactors:  " << FLAGS_server_reactors;
+    LOG(INFO) << "Encryption:       " << FLAGS_enable_encryption;
+    LOG(INFO) << "----------------------------------";
+    LOG(INFO) << "Reqs/sec:         " << reqs_per_second;
+    LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us";
+    LOG(INFO) << "Sys CPU per req:  " << sys_cpu_micros_per_req << "us";
+    LOG(INFO) << "Ctx Sw. per req:  " << csw_per_req;
+
+  }
+
+ protected:
+  friend class ClientThread;
+  friend class ClientAsyncWorkload;
+
+  Sockaddr server_addr_;
+  Atomic32 should_run_;
+  CountDownLatch stop_;
+};
+
+class ClientThread {
+ public:
+  explicit ClientThread(RpcBench *bench)
+    : bench_(bench),
+      request_count_(0) {
+  }
+
+  void Start() {
+    thread_.reset(new thread(&ClientThread::Run, this));
+  }
+
+  void Join() {
+    thread_->join();
+  }
+
+  void Run() {
+    shared_ptr<Messenger> client_messenger = bench_->CreateMessenger("Client");
+
+    CalculatorServiceProxy p(client_messenger, bench_->server_addr_);
+
+    AddRequestPB req;
+    AddResponsePB resp;
+    while (Acquire_Load(&bench_->should_run_)) {
+      req.set_x(request_count_);
+      req.set_y(request_count_);
+      RpcController controller;
+      controller.set_timeout(MonoDelta::FromSeconds(10));
+      CHECK_OK(p.Add(req, &resp, &controller));
+      CHECK_EQ(req.x() + req.y(), resp.result());
+      request_count_++;
+    }
+  }
+
+  unique_ptr<thread> thread_;
+  RpcBench *bench_;
+  int request_count_;
+};
+
+
+// Test making successful RPC calls.
+TEST_F(RpcBench, BenchmarkCalls) {
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  sw.start();
+
+  vector<unique_ptr<ClientThread>> threads;
+  for (int i = 0; i < FLAGS_client_threads; i++) {
+    threads.emplace_back(new ClientThread(this));
+    threads.back()->Start();
+  }
+
+  SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
+  Release_Store(&should_run_, false);
+
+  int total_reqs = 0;
+
+  for (auto& thr : threads) {
+    thr->Join();
+    total_reqs += thr->request_count_;
+  }
+  sw.stop();
+
+  SummarizePerf(sw.elapsed(), total_reqs, true);
+}
+
+class ClientAsyncWorkload {
+ public:
+  ClientAsyncWorkload(RpcBench *bench, shared_ptr<Messenger> messenger)
+    : bench_(bench),
+      messenger_(std::move(messenger)),
+      request_count_(0) {
+    controller_.set_timeout(MonoDelta::FromSeconds(10));
+    proxy_.reset(new CalculatorServiceProxy(messenger_, bench_->server_addr_));
+  }
+
+  void CallOneRpc() {
+    if (request_count_ > 0) {
+      CHECK_OK(controller_.status());
+      CHECK_EQ(req_.x() + req_.y(), resp_.result());
+    }
+    if (!Acquire_Load(&bench_->should_run_)) {
+      bench_->stop_.CountDown();
+      return;
+    }
+    controller_.Reset();
+    req_.set_x(request_count_);
+    req_.set_y(request_count_);
+    request_count_++;
+    proxy_->AddAsync(req_,
+                     &resp_,
+                     &controller_,
+                     bind(&ClientAsyncWorkload::CallOneRpc, this));
+  }
+
+  void Start() {
+    CallOneRpc();
+  }
+
+  RpcBench *bench_;
+  shared_ptr<Messenger> messenger_;
+  unique_ptr<CalculatorServiceProxy> proxy_;
+  uint32_t request_count_;
+  RpcController controller_;
+  AddRequestPB req_;
+  AddResponsePB resp_;
+};
+
+TEST_F(RpcBench, BenchmarkCallsAsync) {
+  int threads = FLAGS_client_threads;
+  int concurrency = FLAGS_async_call_concurrency;
+
+  vector<shared_ptr<Messenger>> messengers;
+  for (int i = 0; i < threads; i++) {
+    messengers.push_back(CreateMessenger("Client"));
+  }
+
+  vector<unique_ptr<ClientAsyncWorkload>> workloads;
+  for (int i = 0; i < concurrency; i++) {
+    workloads.emplace_back(
+        new ClientAsyncWorkload(this, messengers[i % threads]));
+  }
+
+  stop_.Reset(concurrency);
+
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  sw.start();
+
+  for (int i = 0; i < concurrency; i++) {
+    workloads[i]->Start();
+  }
+
+  SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
+  Release_Store(&should_run_, false);
+
+  sw.stop();
+
+  stop_.Wait();
+  int total_reqs = 0;
+  for (int i = 0; i < concurrency; i++) {
+    total_reqs += workloads[i]->request_count_;
+  }
+
+  SummarizePerf(sw.elapsed(), total_reqs, false);
+}
+
+} // namespace rpc
+} // namespace kudu
+