You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/08/16 06:08:34 UTC

[3/4] kudu git commit: Add time/watermark based garbage collection to ResultTracker

Add time/watermark based garbage collection to ResultTracker

This adds time and watermark based garbage collection to the
ResultTracker.  Regarding time GC, there are two ttl's, a client ttl and
a response ttl.

After the response ttl has elapsed, we garbage collect responses
but the ResultTracker remembers that it doesn't know them, so if
the client retries a request older than that it gets a meaningful
error back, stating that the request is stale.

After the client ttl period without hearing back from a client,
we GC the client state entirely, meaning all requests from that
client will be treated as new.

Regarding watermark GC the algorithm is simple, we trust the client to
tell us what's its lowest incomplete sequence number and we GC
everything below that.

This adds a simple test that makes sure this basically works, and adds a
multithreaded test that runs GC at the same time as writes.

NOTE: this does not wire the time-based garbage collection process into
the server itself -- it's currently only triggered by the included
tests.

Original patch by David.
Some changes by Todd.

Change-Id: I2c8e7b7191ca14842a31b64813ed498bdf626fa8
Reviewed-on: http://gerrit.cloudera.org:8080/3628
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fc09ddcd
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fc09ddcd
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fc09ddcd

Branch: refs/heads/master
Commit: fc09ddcd7771b1cdffe8bcc8a112866c1d02d2e1
Parents: 5ef3743
Author: dralves <dr...@apache.org>
Authored: Mon Jul 11 14:01:35 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 16 05:58:54 2016 +0000

----------------------------------------------------------------------
 .../exactly_once_writes-itest.cc                |  14 +-
 src/kudu/rpc/CMakeLists.txt                     |   2 +-
 src/kudu/rpc/exactly_once_rpc-test.cc           | 586 +++++++++++++++++++
 src/kudu/rpc/result_tracker.cc                  | 144 ++++-
 src/kudu/rpc/result_tracker.h                   |  46 +-
 src/kudu/rpc/retriable_rpc.h                    |   1 +
 src/kudu/rpc/rpc-stress-test.cc                 | 402 -------------
 src/kudu/rpc/rpc_header.proto                   |   6 +-
 src/kudu/rpc/service_if.cc                      |   3 +
 src/kudu/tablet/tablet_bootstrap.cc             |  12 +-
 .../tablet/transactions/transaction_driver.cc   |   9 +-
 11 files changed, 797 insertions(+), 428 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/integration-tests/exactly_once_writes-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index 657604b..f74da83 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -78,12 +78,21 @@ void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
   std::unique_ptr<TabletServerServiceProxy> proxy(new TabletServerServiceProxy(client_messenger,
                                                                                address));
   for (int i = 0; i < num_batches; i++) {
+    // Wait for all of the other writer threads to finish their attempts of the prior
+    // batch before continuing on to the next one. This has two important effects:
+    //   1) we are more likely to trigger races where multiple attempts of the same sequence
+    //      number arrive concurrently.
+    //   2) we set 'first_incomplete_seq_no' to our current sequence number, which means
+    //      that each time we start a new batch, we allow garbage collection of the result
+    //      tracker entries for the prior batches. So, if we let other threads continue to
+    //      retry the prior batch while we moved on to the next batch, they might get a
+    //      'STALE' error response.
     barrier->Wait();
     WriteRequestPB request;
     request.set_tablet_id(tablet_id_);
     SchemaToPB(schema, request.mutable_schema());
 
-    // For 1/3 of the batches peform an empty write. This will make sure that we also stress
+    // For 1/3 of the batches, perform an empty write. This will make sure that we also stress
     // the path where writes aren't serialized by row locks.
     if (i % 3 != 0) {
       for (int j = 0; j < batch_size; j++) {
@@ -95,6 +104,7 @@ void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
 
     int64_t num_attempts = 0;
     int64_t base_attempt_idx = thread_idx * num_batches + i;
+
     while (true) {
       controller.Reset();
       WriteResponsePB response;
@@ -103,7 +113,7 @@ void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
       request_id->set_client_id("test_client");
       request_id->set_seq_no(i);
       request_id->set_attempt_no(base_attempt_idx * kMaxAttempts + num_attempts);
-      request_id->set_first_incomplete_seq_no(rpc::RequestTracker::NO_SEQ_NO);
+      request_id->set_first_incomplete_seq_no(i);
 
       controller.SetRequestIdPB(std::move(request_id));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index e76d96a..4a471d4 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -114,11 +114,11 @@ target_link_libraries(rtest_krpc
 
 # Tests
 set(KUDU_TEST_LINK_LIBS rtest_krpc krpc rpc_header_proto ${KUDU_MIN_TEST_LIBS})
+ADD_KUDU_TEST(exactly_once_rpc-test)
 ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
 ADD_KUDU_TEST(reactor-test)
 ADD_KUDU_TEST(request_tracker-test)
 ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
-ADD_KUDU_TEST(rpc-stress-test)
 ADD_KUDU_TEST(rpc-test)
 ADD_KUDU_TEST(rpc_stub-test)
 ADD_KUDU_TEST(sasl_rpc-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
new file mode 100644
index 0000000..3f3f59e
--- /dev/null
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -0,0 +1,586 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/retriable_rpc.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc-test-base.h"
+
+DECLARE_int64(remember_clients_ttl_ms);
+DECLARE_int64(remember_responses_ttl_ms);
+
+using std::atomic_int;
+using std::shared_ptr;
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+
+const char* kClientId = "test-client";
+
+void AddRequestId(RpcController* controller,
+                  const std::string& client_id,
+                  ResultTracker::SequenceNumber sequence_number,
+                  int64_t attempt_no) {
+  unique_ptr<RequestIdPB> request_id(new RequestIdPB());
+  request_id->set_client_id(client_id);
+  request_id->set_seq_no(sequence_number);
+  request_id->set_attempt_no(attempt_no);
+  request_id->set_first_incomplete_seq_no(sequence_number);
+  controller->SetRequestIdPB(std::move(request_id));
+}
+
+class TestServerPicker : public ServerPicker<CalculatorServiceProxy> {
+ public:
+  explicit TestServerPicker(CalculatorServiceProxy* proxy) : proxy_(proxy) {}
+
+  void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override {
+    callback.Run(Status::OK(), proxy_);
+  }
+
+  void MarkServerFailed(CalculatorServiceProxy*, const Status&) override {}
+  void MarkReplicaNotLeader(CalculatorServiceProxy*) override {}
+  void MarkResourceNotFound(CalculatorServiceProxy*) override {}
+
+ private:
+  CalculatorServiceProxy* proxy_;
+};
+
+} // anonymous namespace
+
+class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
+                                                 ExactlyOnceRequestPB,
+                                                 ExactlyOnceResponsePB> {
+ public:
+  CalculatorServiceRpc(const scoped_refptr<TestServerPicker>& server_picker,
+                       const scoped_refptr<RequestTracker>& request_tracker,
+                       const MonoTime& deadline,
+                       const shared_ptr<Messenger>& messenger,
+                       int value,
+                       CountDownLatch* latch,
+                       int server_sleep = 0)
+      : RetriableRpc(server_picker, request_tracker, deadline, messenger), latch_(latch) {
+    req_.set_value_to_add(value);
+    req_.set_randomly_fail(true);
+    req_.set_sleep_for_ms(server_sleep);
+  }
+
+  void Try(CalculatorServiceProxy* server, const ResponseCallback& callback) override {
+    server->AddExactlyOnceAsync(req_,
+                                &resp_,
+                                mutable_retrier()->mutable_controller(),
+                                callback);
+  }
+
+  RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override {
+    // We shouldn't get errors from the server/rpc system since we set a high timeout.
+    CHECK_OK(rpc_cb_status);
+
+    if (!mutable_retrier()->controller().status().ok()) {
+      CHECK(mutable_retrier()->controller().status().IsRemoteError());
+      if (mutable_retrier()->controller().error_response()->code()
+          == ErrorStatusPB::ERROR_REQUEST_STALE) {
+        return { RetriableRpcStatus::NON_RETRIABLE_ERROR,
+              mutable_retrier()->controller().status() };
+      } else {
+        return { RetriableRpcStatus::SERVER_BUSY, mutable_retrier()->controller().status() };
+      }
+    }
+
+    // If the controller is not finished we're in the ReplicaFoundCb() callback.
+    // Return ok to proceed with the call to the server.
+    if (!mutable_retrier()->mutable_controller()->finished()) {
+      return { RetriableRpcStatus::OK, Status::OK() };
+    }
+
+    // If we've received a response in the past, all following responses must
+    // match.
+    if (!successful_response_.IsInitialized()) {
+      successful_response_.CopyFrom(resp_);
+    } else {
+      CHECK_EQ(successful_response_.DebugString(), resp_.DebugString());
+    }
+
+    if (sometimes_retry_successful_) {
+      // Still report errors, with some probability. This will cause requests to
+      // be retried. Since the requests were originally successful we should get
+      // the same reply back.
+      int random = rand() % 4;
+      switch (random) {
+        case 0: return { RetriableRpcStatus::SERVER_BUSY, Status::RemoteError("") };
+        case 1: return { RetriableRpcStatus::RESOURCE_NOT_FOUND, Status::RemoteError("") };
+        case 2: return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE, Status::RemoteError("") };
+        case 3: return { RetriableRpcStatus::OK, Status::OK() };
+        default: LOG(FATAL) << "Unexpected value";
+      }
+    }
+    return { RetriableRpcStatus::OK, Status::OK() };
+  }
+
+  void Finish(const Status& status) override {
+    CHECK_OK(status);
+    latch_->CountDown();
+    delete this;
+  }
+
+  std::string ToString() const override { return "test-rpc"; }
+  CountDownLatch* latch_;
+  ExactlyOnceResponsePB successful_response_;
+  bool sometimes_retry_successful_ = true;
+};
+
+class ExactlyOnceRpcTest : public RpcTestBase {
+ public:
+  void SetUp() override {
+    RpcTestBase::SetUp();
+    SeedRandom();
+  }
+
+  void StartServer() {
+    // Set up server.
+    StartTestServerWithGeneratedCode(&server_addr_);
+    client_messenger_ = CreateMessenger("Client");
+    proxy_.reset(new CalculatorServiceProxy(client_messenger_, server_addr_));
+    test_picker_.reset(new TestServerPicker(proxy_.get()));
+    request_tracker_.reset(new RequestTracker(kClientId));
+    attempt_nos_ = 0;
+  }
+
+  // An exactly once adder that uses RetriableRpc to perform the requests.
+  struct RetriableRpcExactlyOnceAdder {
+    RetriableRpcExactlyOnceAdder(const scoped_refptr<TestServerPicker>& server_picker,
+                     const scoped_refptr<RequestTracker>& request_tracker,
+                     const shared_ptr<Messenger>& messenger,
+                     int value,
+                     int server_sleep = 0) : latch_(1) {
+      MonoTime now = MonoTime::Now();
+      now.AddDelta(MonoDelta::FromMilliseconds(10000));
+      rpc_ = new CalculatorServiceRpc(server_picker,
+                                      request_tracker,
+                                      now,
+                                      messenger,
+                                      value,
+                                      &latch_);
+    }
+
+    void Start() {
+      CHECK_OK(kudu::Thread::Create(
+                   "test",
+                   "test",
+                   &RetriableRpcExactlyOnceAdder::SleepAndSend, this, &thread));
+    }
+
+    void SleepAndSend() {
+      rpc_->SendRpc();
+      latch_.Wait();
+    }
+
+    CountDownLatch latch_;
+    scoped_refptr<kudu::Thread> thread;
+    CalculatorServiceRpc* rpc_;
+  };
+
+  // An exactly once adder that sends multiple, simultaneous calls, to the server
+  // and makes sure that only one of the calls was successful.
+  struct SimultaneousExactlyOnceAdder {
+    SimultaneousExactlyOnceAdder(CalculatorServiceProxy* p,
+                     ResultTracker::SequenceNumber sequence_number,
+                     int value,
+                     uint64_t client_sleep,
+                     uint64_t server_sleep,
+                     int64_t attempt_no)
+     : proxy(p),
+       client_sleep_for_ms(client_sleep) {
+      req.set_value_to_add(value);
+      req.set_sleep_for_ms(server_sleep);
+      AddRequestId(&controller, kClientId, sequence_number, attempt_no);
+    }
+
+    void Start() {
+      CHECK_OK(kudu::Thread::Create(
+          "test",
+          "test",
+          &SimultaneousExactlyOnceAdder::SleepAndSend, this, &thread));
+    }
+
+    // Sleeps the preset number of msecs before sending the call.
+    void SleepAndSend() {
+      usleep(client_sleep_for_ms * 1000);
+      controller.set_timeout(MonoDelta::FromSeconds(20));
+      CHECK_OK(proxy->AddExactlyOnce(req, &resp, &controller));
+    }
+
+    CalculatorServiceProxy* const proxy;
+    const uint64_t client_sleep_for_ms;
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    ExactlyOnceResponsePB resp;
+    scoped_refptr<kudu::Thread> thread;
+  };
+
+
+  void CheckValueMatches(int expected_value) {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(0);
+    ExactlyOnceResponsePB resp;
+    RequestTracker::SequenceNumber seq_no;
+    CHECK_OK(request_tracker_->NewSeqNo(&seq_no));
+    AddRequestId(&controller, kClientId, seq_no, 0);
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    ASSERT_EQ(resp.current_val(), expected_value);
+    request_tracker_->RpcCompleted(seq_no);
+  }
+
+  // Continuously runs GC on the ResultTracker.
+  void RunGcThread(MonoDelta run_for) {
+    MonoTime run_until = MonoTime::Now();
+    run_until.AddDelta(run_for);
+    while (MonoTime::Now().ComesBefore(run_until)) {
+      result_tracker_->GCResults();
+      SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+    }
+  }
+
+
+  // This continuously issues calls to the server, that often last longer than
+  // 'remember_responses_ttl_ms', making sure that we don't get errors back.
+  void DoLongWritesThread(MonoDelta run_for) {
+    MonoTime run_until = MonoTime::Now();
+    run_until.AddDelta(run_for);
+    int counter = 0;
+    while (MonoTime::Now().ComesBefore(run_until)) {
+      unique_ptr<RetriableRpcExactlyOnceAdder> adder(new RetriableRpcExactlyOnceAdder(
+          test_picker_, request_tracker_, client_messenger_, 1,
+          rand() % (2 * FLAGS_remember_responses_ttl_ms)));
+
+      // This thread is used in the stress test where we're constantly running GC.
+      // So, once we get a "success" response, it's likely that the result will be
+      // GCed on the server side, and thus it's not safe to spuriously retry.
+      adder->rpc_->sometimes_retry_successful_ = false;
+      adder->SleepAndSend();
+      SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+      counter++;
+    }
+    ExactlyOnceResponsePB response;
+    ResultTracker::SequenceNumber sequence_number;
+    CHECK_OK(request_tracker_->NewSeqNo(&sequence_number));
+    CHECK_OK(MakeAddCall(sequence_number, 0, &response));
+    CHECK_EQ(response.current_val(), counter);
+    request_tracker_->RpcCompleted(sequence_number);
+  }
+
+  // Stubbornly sends the same request to the server, this should observe three states.
+  // The request should be successful at first, then its result should be GCed and the
+  // client should be GCed.
+  void StubbornlyWriteTheSameRequestThread(MonoDelta run_for) {
+    MonoTime run_until = MonoTime::Now();
+    run_until.AddDelta(run_for);
+    // Make an initial request, so that we get a response to compare to.
+    ResultTracker::SequenceNumber sequence_number;
+    CHECK_OK(request_tracker_->NewSeqNo(&sequence_number));
+    ExactlyOnceResponsePB original_response;
+    CHECK_OK(MakeAddCall(sequence_number, 0, &original_response));
+
+    // Now repeat the same request. At first we should get the same response, then the result
+    // should be GCed and we should get STALE back. Finally the request should succeed again
+    // but we should get a new response.
+    bool result_gced = false;
+    bool client_gced = false;
+    while (MonoTime::Now().ComesBefore(run_until)) {
+      ExactlyOnceResponsePB response;
+      Status s = MakeAddCall(sequence_number, 0, &response);
+      if (s.ok()) {
+        if (!result_gced) {
+          CHECK_EQ(response.ShortDebugString(), original_response.ShortDebugString());
+        } else {
+          client_gced = true;
+          CHECK_NE(response.ShortDebugString(), original_response.ShortDebugString());
+        }
+        SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+      } else if (s.IsRemoteError()) {
+        result_gced = true;
+        SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms * 2));
+      }
+    }
+    CHECK(result_gced);
+    CHECK(client_gced);
+  }
+
+  Status MakeAddCall(ResultTracker::SequenceNumber sequence_number,
+                     int value_to_add,
+                     ExactlyOnceResponsePB* response,
+                     int attempt_no = -1) {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(value_to_add);
+    if (attempt_no == -1) attempt_no = attempt_nos_.fetch_add(1);
+    AddRequestId(&controller, kClientId, sequence_number, attempt_no);
+    Status s = proxy_->AddExactlyOnce(req, response, &controller);
+    return s;
+  }
+
+ protected:
+  Sockaddr server_addr_;
+  atomic_int attempt_nos_;
+  shared_ptr<Messenger> client_messenger_;
+  std::unique_ptr<CalculatorServiceProxy> proxy_;
+  scoped_refptr<TestServerPicker> test_picker_;
+  scoped_refptr<RequestTracker> request_tracker_;
+};
+
+// Tests that we get exactly once semantics on RPCs when we send a bunch of requests with the
+// same sequence number as previous requests.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
+  StartServer();
+  ExactlyOnceResponsePB original_resp;
+  int mem_consumption = mem_tracker_->consumption();
+  {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(1);
+
+    // Assign id 0.
+    AddRequestId(&controller, kClientId, 0, 0);
+
+    // Send the request the first time.
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &original_resp, &controller));
+
+    // The incremental usage of a new client is the size of the response itself
+    // plus some fixed overhead for the client-tracking structure.
+    int expected_incremental_usage = original_resp.SpaceUsed() + 200;
+
+    // The consumption isn't immediately updated, since the MemTracker update
+    // happens after we call 'Respond' on the RPC.
+    int mem_consumption_after;
+    AssertEventually([&]() {
+        mem_consumption_after = mem_tracker_->consumption();
+        ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
+      });
+    mem_consumption = mem_consumption_after;
+  }
+
+  // Now repeat the rpc 10 times, using the same sequence number, none of these should be executed
+  // and they should get the same response back.
+  for (int i = 0; i < 10; i++) {
+    RpcController controller;
+    controller.set_timeout(MonoDelta::FromSeconds(20));
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(1);
+    ExactlyOnceResponsePB resp;
+    AddRequestId(&controller, kClientId, 0, i + 1);
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    ASSERT_EQ(resp.current_val(), 1);
+    ASSERT_EQ(resp.current_time_micros(), original_resp.current_time_micros());
+    // Sleep to give the MemTracker time to update -- we don't expect any update,
+    // but if we had a bug here, we'd only see it with this sleep.
+    SleepFor(MonoDelta::FromMilliseconds(100));
+    // We shouldn't have consumed any more memory since the responses were cached.
+    ASSERT_EQ(mem_consumption, mem_tracker_->consumption());
+  }
+
+  // Making a new request, from a new client, should double the memory consumption.
+  {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    ExactlyOnceResponsePB resp;
+    req.set_value_to_add(1);
+
+    // Assign id 0.
+    AddRequestId(&controller, "test-client2", 0, 0);
+
+    // Send the first request for this new client.
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    AssertEventually([&]() {
+        ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
+      });
+  }
+}
+
+// Performs a series of requests in which each single request is attempted multiple times, as
+// the server side is instructed to spuriously fail attempts.
+// In CalculatorServiceRpc we sure that the same response is returned by all retries and,
+// after all the rpcs are done, we make sure that final result is the expected one.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
+  StartServer();
+  int kNumIterations = 10;
+  int kNumRpcs = 10;
+
+  if (AllowSlowTests()) {
+    kNumIterations = 100;
+    kNumRpcs = 100;
+  }
+
+  int count = 0;
+  for (int i = 0; i < kNumIterations; i ++) {
+    vector<unique_ptr<RetriableRpcExactlyOnceAdder>> adders;
+    for (int j = 0; j < kNumRpcs; j++) {
+      unique_ptr<RetriableRpcExactlyOnceAdder> adder(
+          new RetriableRpcExactlyOnceAdder(test_picker_, request_tracker_, client_messenger_, j));
+      adders.push_back(std::move(adder));
+      adders[j]->Start();
+      count += j;
+    }
+    for (int j = 0; j < kNumRpcs; j++) {
+      CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
+    }
+    CheckValueMatches(count);
+  }
+}
+
+// Performs a series of requests in which each single request is attempted by multiple threads.
+// On each iteration, after all the threads complete, we expect that the add operation was
+// executed exactly once.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
+  StartServer();
+  int kNumIterations = 10;
+  int kNumThreads = 10;
+
+  if (AllowSlowTests()) {
+    kNumIterations = 100;
+    kNumThreads = 100;
+  }
+
+  ResultTracker::SequenceNumber sequence_number = 0;
+  int memory_consumption_initial = mem_tracker_->consumption();
+  int single_response_size = 0;
+
+  // Measure memory consumption for a single response from the same client.
+  ExactlyOnceResponsePB resp;
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+
+  for (int i = 1; i <= kNumIterations; i ++) {
+    vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
+    for (int j = 0; j < kNumThreads; j++) {
+      unique_ptr<SimultaneousExactlyOnceAdder> adder(
+          new SimultaneousExactlyOnceAdder(proxy_.get(), i, 1,
+                                           rand() % 20,
+                                           rand() % 10,
+                                           attempt_nos_.fetch_add(1)));
+      adders.push_back(std::move(adder));
+      adders[j]->Start();
+    }
+    uint64_t time_micros = 0;
+    for (int j = 0; j < kNumThreads; j++) {
+      CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
+      ASSERT_EQ(adders[j]->resp.current_val(), i + 1);
+      if (time_micros == 0) {
+        time_micros = adders[j]->resp.current_time_micros();
+      } else {
+        ASSERT_EQ(adders[j]->resp.current_time_micros(), time_micros);
+      }
+    }
+
+    // Wait for the MemTracker to be updated.
+    // After all adders finished we should at least the size of one more response.
+    // The actual size depends of multiple factors, for instance, how many calls were "attached"
+    // (which is timing dependent) so we can't be more precise than this.
+    AssertEventually([&]() {
+        ASSERT_GT(mem_tracker_->consumption(),
+                  memory_consumption_initial + single_response_size * i);
+      });
+  }
+}
+
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) {
+  FLAGS_remember_clients_ttl_ms = 1000;
+  FLAGS_remember_responses_ttl_ms = 100;
+
+  StartServer();
+
+  // Make a request.
+  ExactlyOnceResponsePB original;
+  ResultTracker::SequenceNumber sequence_number = 0;
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &original));
+
+  // Making the same request again, should return the same response.
+  ExactlyOnceResponsePB resp;
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+  ASSERT_EQ(original.ShortDebugString(), resp.ShortDebugString());
+
+  // Now sleep for 'remember_responses_ttl_ms' and run GC, we should then
+  // get a STALE back.
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_responses_ttl_ms));
+  int64_t memory_consumption = mem_tracker_->consumption();
+  result_tracker_->GCResults();
+  ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+  resp.Clear();
+  Status s = MakeAddCall(sequence_number, 1, &resp);
+  ASSERT_TRUE(s.IsRemoteError());
+  ASSERT_STR_CONTAINS(s.ToString(), "is stale");
+
+  // Sleep again, this time for 'remember_clients_ttl_ms' and run GC again.
+  // The request should be successful, but its response should be a new one.
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms));
+  memory_consumption = mem_tracker_->consumption();
+  result_tracker_->GCResults();
+  ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+  resp.Clear();
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+  ASSERT_NE(resp.ShortDebugString(), original.ShortDebugString());
+}
+
+// This test creates a thread continuously making requests to the server, some lasting longer
+// than the GC period, at the same time it runs GC, making sure that the corresponding
+// CompletionRecords/ClientStates are not deleted from underneath the ongoing requests.
+// This also creates a thread that runs GC very frequently and another thread that sends the
+// same request over and over and observes the possible states: request is ok, request is stale
+// request is ok again (because the client was forgotten).
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) {
+  FLAGS_remember_clients_ttl_ms = 100;
+  FLAGS_remember_responses_ttl_ms = 10;
+
+  StartServer();
+
+  // The write thread runs for the shortest period to make sure client GC has a
+  // chance to run.
+  MonoDelta writes_run_for = MonoDelta::FromSeconds(2);
+  MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3);
+  // GC runs for the longest period because the stubborn thread may wait beyond its deadline
+  // to wait on client GC.
+  MonoDelta gc_run_for = MonoDelta::FromSeconds(4);
+  if (AllowSlowTests()) {
+    writes_run_for = MonoDelta::FromSeconds(10);
+    stubborn_run_for = MonoDelta::FromSeconds(11);
+    gc_run_for = MonoDelta::FromSeconds(12);
+  }
+
+  scoped_refptr<kudu::Thread> gc_thread;
+  scoped_refptr<kudu::Thread> write_thread;
+  scoped_refptr<kudu::Thread> stubborn_thread;
+  CHECK_OK(kudu::Thread::Create(
+      "gc", "gc", &ExactlyOnceRpcTest::RunGcThread, this, gc_run_for, &gc_thread));
+  CHECK_OK(kudu::Thread::Create(
+      "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread,
+      this, writes_run_for, &write_thread));
+  CHECK_OK(kudu::Thread::Create(
+      "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
+      this, stubborn_run_for, &stubborn_thread));
+
+  gc_thread->Join();
+  write_thread->Join();
+  stubborn_thread->Join();
+
+  result_tracker_->GCResults();
+  ASSERT_EQ(0, mem_tracker_->consumption());
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index 649ad15..a75b3e1 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -17,15 +17,34 @@
 
 #include "kudu/rpc/result_tracker.h"
 
+#include <algorithm>
+#include <limits>
+
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/pb_util.h"
 
+DEFINE_int64(remember_clients_ttl_ms, 3600 * 1000 /* 1 hour */,
+    "Maximum amount of time, in milliseconds, the server \"remembers\" a client for the "
+    "purpose of caching its responses. After this period without hearing from it, the "
+    "client is no longer remembered and the memory occupied by its responses is reclaimed. "
+    "Retries of requests older than 'remember_clients_ttl_ms' are treated as new "
+    "ones.");
+TAG_FLAG(remember_clients_ttl_ms, advanced);
+
+DEFINE_int64(remember_responses_ttl_ms, 600 * 1000 /* 10 mins */,
+    "Maximum amount of time, in milliseconds, the server \"remembers\" a response to a "
+    "specific request for a client. After this period has elapsed, the response may have "
+    "been garbage collected and the client might get a response indicating the request is "
+    "STALE.");
+TAG_FLAG(remember_responses_ttl_ms, advanced);
+
 namespace kudu {
 namespace rpc {
 
@@ -76,9 +95,8 @@ ResultTracker::~ResultTracker() {
   lock_guard<simple_spinlock> l(lock_);
   // Release all the memory for the stuff we'll delete on destruction.
   for (auto& client_state : clients_) {
-    for (auto& completion_record : client_state.second->completion_records) {
-      mem_tracker_->Release(completion_record.second->memory_footprint());
-    }
+    client_state.second->GCCompletionRecords(
+        mem_tracker_, [] (SequenceNumber, CompletionRecord*){ return true; });
     mem_tracker_->Release(client_state.second->memory_footprint());
   }
 }
@@ -99,16 +117,39 @@ ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& reque
       [&]{
         unique_ptr<ClientState> client_state(new ClientState(mem_tracker_));
         mem_tracker_->Consume(client_state->memory_footprint());
+        client_state->stale_before_seq_no = request_id.first_incomplete_seq_no();
         return client_state;
       })->get();
 
   client_state->last_heard_from = MonoTime::Now();
 
+  // If the arriving request is older than our per-client GC watermark, report its
+  // staleness to the client.
+  if (PREDICT_FALSE(request_id.seq_no() < client_state->stale_before_seq_no)) {
+    if (context) {
+      context->call_->RespondFailure(
+          ErrorStatusPB::ERROR_REQUEST_STALE,
+          Status::Incomplete(Substitute("Request with id { $0 } is stale.",
+                                        request_id.ShortDebugString())));
+      delete context;
+    }
+    return RpcState::STALE;
+  }
+
+  // GC records according to the client's first incomplete watermark.
+  client_state->GCCompletionRecords(
+      mem_tracker_,
+      [&] (SequenceNumber seq_no, CompletionRecord* completion_record) {
+        return completion_record->state != RpcState::IN_PROGRESS &&
+            seq_no < request_id.first_incomplete_seq_no();
+      });
+
   auto result = ComputeIfAbsentReturnAbsense(
       &client_state->completion_records,
       request_id.seq_no(),
       [&]{
-        unique_ptr<CompletionRecord> completion_record(new CompletionRecord());
+        unique_ptr<CompletionRecord> completion_record(new CompletionRecord(
+            RpcState::IN_PROGRESS, request_id.attempt_no()));
         mem_tracker_->Consume(completion_record->memory_footprint());
         return completion_record;
       });
@@ -117,8 +158,6 @@ ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& reque
   ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
 
   if (PREDICT_TRUE(result.second)) {
-    completion_record->state = RpcState::IN_PROGRESS;
-    completion_record->driver_attempt_no = request_id.attempt_no();
     // When a follower is applying an operation it doesn't have a response yet, and it won't
     // have a context, so only set them if they exist.
     if (response != nullptr) {
@@ -129,6 +168,7 @@ ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& reque
     return RpcState::NEW;
   }
 
+  completion_record->last_updated = MonoTime::Now();
   switch (completion_record->state) {
     case RpcState::COMPLETED: {
       // If the RPC is COMPLETED and the request originates from a client (context, response are
@@ -172,6 +212,7 @@ ResultTracker::RpcState ResultTracker::TrackRpcOrChangeDriver(const RequestIdPB&
   completion_record->ongoing_rpcs.push_back({nullptr,
                                              nullptr,
                                              request_id.attempt_no()});
+
   // Since we changed the driver of the RPC, return NEW, so that the caller knows
   // to store the result.
   return RpcState::NEW;
@@ -259,6 +300,7 @@ void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id,
   completion_record->response.reset(DCHECK_NOTNULL(response)->New());
   completion_record->response->CopyFrom(*response);
   completion_record->state = RpcState::COMPLETED;
+  completion_record->last_updated = MonoTime::Now();
 
   CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no());
 
@@ -301,6 +343,8 @@ void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
     return;
   }
 
+  completion_record->last_updated = MonoTime::Now();
+
   int64_t seq_no = request_id.seq_no();
   int64_t handler_attempt_no = request_id.attempt_no();
 
@@ -367,6 +411,54 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
   FailAndRespondInternal(request_id, func);
 }
 
+void ResultTracker::GCResults() {
+  lock_guard<simple_spinlock> l(lock_);
+  MonoTime now = MonoTime::Now();
+  // Calculate the instants before which we'll start GCing ClientStates and CompletionRecords.
+  MonoTime time_to_gc_clients_from = now;
+  time_to_gc_clients_from.AddDelta(
+      MonoDelta::FromMilliseconds(-FLAGS_remember_clients_ttl_ms));
+  MonoTime time_to_gc_responses_from = now;
+  time_to_gc_responses_from.AddDelta(
+      MonoDelta::FromMilliseconds(-FLAGS_remember_responses_ttl_ms));
+
+  // Now go through the ClientStates. If we haven't heard from a client in a while
+  // GC it and all its completion records (making sure there isn't actually one in progress first).
+  // If we've heard from a client recently, but some of its responses are old, GC those responses.
+  for (auto iter = clients_.begin(); iter != clients_.end();) {
+    auto& client_state = iter->second;
+    if (client_state->last_heard_from.ComesBefore(time_to_gc_clients_from)) {
+      // Client should be GCed.
+      bool ongoing_request = false;
+      client_state->GCCompletionRecords(
+          mem_tracker_,
+          [&] (SequenceNumber, CompletionRecord* completion_record) {
+            if (PREDICT_FALSE(completion_record->state == RpcState::IN_PROGRESS)) {
+              ongoing_request = true;
+              return false;
+            }
+            return true;
+          });
+      // Don't delete the client state if there is still a request in execution.
+      if (PREDICT_FALSE(ongoing_request)) {
+        ++iter;
+        continue;
+      }
+      mem_tracker_->Release(client_state->memory_footprint());
+      iter = clients_.erase(iter);
+    } else {
+      // Client can't be GCed, but its calls might be GCable.
+      iter->second->GCCompletionRecords(
+          mem_tracker_,
+          [&] (SequenceNumber, CompletionRecord* completion_record) {
+            return completion_record->state != RpcState::IN_PROGRESS &&
+                completion_record->last_updated.ComesBefore(time_to_gc_responses_from);
+          });
+      ++iter;
+    }
+  }
+}
+
 string ResultTracker::ToString() {
   lock_guard<simple_spinlock> l(lock_);
   return ToStringUnlocked();
@@ -382,10 +474,35 @@ string ResultTracker::ToStringUnlocked() const {
   return result;
 }
 
+template<class MustGcRecordFunc>
+void ResultTracker::ClientState::GCCompletionRecords(
+    const shared_ptr<kudu::MemTracker>& mem_tracker,
+    MustGcRecordFunc must_gc_record_func) {
+  ScopedMemTrackerUpdater<ClientState> updater(mem_tracker.get(), this);
+  for (auto iter = completion_records.begin(); iter != completion_records.end();) {
+    if (must_gc_record_func(iter->first, iter->second.get())) {
+      mem_tracker->Release(iter->second->memory_footprint());
+      SequenceNumber deleted_seq_no = iter->first;
+      iter = completion_records.erase(iter);
+      // Each time we GC a response, update 'stale_before_seq_no'.
+      // This will allow to answer clients that their responses are stale if we get
+      // a request with a sequence number lower than or equal to this one.
+      stale_before_seq_no = std::max(deleted_seq_no + 1, stale_before_seq_no);
+      continue;
+    }
+    // Since we store completion records in order, if we found one that shouldn't be GCed,
+    // don't GC anything after it.
+    return;
+  }
+}
+
 string ResultTracker::ClientState::ToString() const {
-  string result = Substitute("Client State[Last heard from: $1, Num. Completion "
-                                 "Records: $2, CompletionRecords:\n", completion_records.size(),
-                             last_heard_from.ToString());
+  auto since_last_heard =
+      MonoTime::Now().GetDeltaSince(last_heard_from);
+  string result = Substitute("Client State[Last heard from: $0s ago, "
+                             "$1 CompletionRecords:",
+                             since_last_heard.ToString(),
+                             completion_records.size());
   for (auto& completion_record : completion_records) {
     SubstituteAndAppend(&result, Substitute("\n\tCompletion Record: $0, $1",
                                             completion_record.first,
@@ -396,9 +513,12 @@ string ResultTracker::ClientState::ToString() const {
 }
 
 string ResultTracker::CompletionRecord::ToString() const {
-  string result = Substitute("Completion Record[State: $0, Driver: $1, Num. Ongoing RPCs: $2, "
-                                 "Cached response: $3, OngoingRpcs:", state, driver_attempt_no,
-                             ongoing_rpcs.size(), response ? response->ShortDebugString() : "None");
+  string result = Substitute("Completion Record[State: $0, Driver: $1, "
+                             "Cached response: $2, $3 OngoingRpcs:",
+                             state,
+                             driver_attempt_no,
+                             response ? response->ShortDebugString() : "None",
+                             ongoing_rpcs.size());
   for (auto& orpc : ongoing_rpcs) {
     SubstituteAndAppend(&result, Substitute("\n\t$0", orpc.ToString()));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
index 86d87b2..5c1d518 100644
--- a/src/kudu/rpc/result_tracker.h
+++ b/src/kudu/rpc/result_tracker.h
@@ -222,6 +222,16 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
                       int error_ext_id, const std::string& message,
                       const google::protobuf::Message& app_error_pb);
 
+  // Runs time-based garbage collection on the results this result tracker is caching.
+  // When garbage collection runs, it goes through all ClientStates and:
+  // - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no
+  //   requests are in progress, GCs the ClientState and all its CompletionRecords.
+  // - If a ClientState is newer than the 'remember_clients_ttl_ms' flag, goes
+  //   through all CompletionRecords and:
+  //   - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag,
+  //     GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark.
+  void GCResults();
+
   string ToString();
 
  private:
@@ -236,12 +246,24 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
   };
   // A completion record for an IN_PROGRESS or COMPLETED RPC.
   struct CompletionRecord {
+    CompletionRecord(RpcState state, int64_t driver_attempt_no)
+        : state(state),
+          driver_attempt_no(driver_attempt_no),
+          last_updated(MonoTime::Now()) {
+    }
+
     // The current state of the RPC.
     RpcState state;
+
     // The attempt number that is/was "driving" this RPC.
     int64_t driver_attempt_no;
+
+    // The timestamp of the last CompletionRecord update.
+    MonoTime last_updated;
+
     // The cached response, if this RPC is in COMPLETED state.
     std::unique_ptr<google::protobuf::Message> response;
+
     // The set of ongoing RPCs that correspond to this record.
     std::vector<OnGoingRpcInfo> ongoing_rpcs;
 
@@ -254,6 +276,7 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
           + (response.get() != nullptr ? response->SpaceUsed() : 0);
     }
   };
+
   // The state corresponding to a single client.
   struct ClientState {
     typedef MemTrackerAllocator<
@@ -265,12 +288,31 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
                      CompletionRecordMapAllocator> CompletionRecordMap;
 
     explicit ClientState(std::shared_ptr<MemTracker> mem_tracker)
-     : completion_records(CompletionRecordMap::key_compare(),
-                          CompletionRecordMapAllocator(std::move(mem_tracker))) {}
+        : stale_before_seq_no(0),
+          completion_records(CompletionRecordMap::key_compare(),
+                             CompletionRecordMapAllocator(std::move(mem_tracker))) {}
 
+    // The last time we've heard from this client.
     MonoTime last_heard_from;
+
+    // The sequence number of the first response we remember for this client.
+    // All sequence numbers before this one are considered STALE.
+    SequenceNumber stale_before_seq_no;
+
+    // The (un gc'd) CompletionRecords for this client.
     CompletionRecordMap completion_records;
 
+    // Garbage collects this client's CompletionRecords for which MustGcRecordFunc returns
+    // true. We use a lambda here so that we can have a single method that GCs and releases
+    // the memory for CompletionRecords based on different policies.
+    //
+    // 'func' should have the following signature:
+    //   bool MyFunction(SequenceNumber seq_no, CompletionRecord* record);
+    //
+    template<class MustGcRecordFunc>
+    void GCCompletionRecords(const std::shared_ptr<kudu::MemTracker>& mem_tracker,
+                             MustGcRecordFunc func);
+
     std::string ToString() const;
 
     // Calculates the memory footprint of this struct.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index f2d3ab5..9d4fdeb 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -89,6 +89,7 @@ class RetriableRpc : public Rpc {
   ResponsePB resp_;
 
  private:
+  friend class CalculatorServiceRpc;
   // Decides whether to retry the RPC, based on the result of AnalyzeResponse() and retries
   // if that is the case.
   // Returns true if the RPC was retried or false otherwise.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/rpc-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-stress-test.cc b/src/kudu/rpc/rpc-stress-test.cc
deleted file mode 100644
index 2d1215e..0000000
--- a/src/kudu/rpc/rpc-stress-test.cc
+++ /dev/null
@@ -1,402 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/rpc/retriable_rpc.h"
-#include "kudu/rpc/rpc.h"
-#include "kudu/rpc/rpc-test-base.h"
-
-using std::atomic_int;
-using std::shared_ptr;
-using std::unique_ptr;
-
-namespace kudu {
-namespace rpc {
-
-namespace {
-
-const char* kClientId = "test-client";
-
-void AddRequestId(RpcController* controller,
-                  const std::string& client_id,
-                  ResultTracker::SequenceNumber sequence_number,
-                  int64_t attempt_no) {
-  unique_ptr<RequestIdPB> request_id(new RequestIdPB());
-  request_id->set_client_id(client_id);
-  request_id->set_seq_no(sequence_number);
-  request_id->set_attempt_no(attempt_no);
-  request_id->set_first_incomplete_seq_no(sequence_number);
-  controller->SetRequestIdPB(std::move(request_id));
-}
-
-class TestServerPicker : public ServerPicker<CalculatorServiceProxy> {
- public:
-  explicit TestServerPicker(CalculatorServiceProxy* proxy) : proxy_(proxy) {}
-
-  void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override {
-    callback.Run(Status::OK(), proxy_);
-  }
-
-  void MarkServerFailed(CalculatorServiceProxy*, const Status&) override {}
-  void MarkReplicaNotLeader(CalculatorServiceProxy*) override {}
-  void MarkResourceNotFound(CalculatorServiceProxy*) override {}
-
- private:
-  CalculatorServiceProxy* proxy_;
-};
-
-class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
-                                                 ExactlyOnceRequestPB,
-                                                 ExactlyOnceResponsePB> {
- public:
-  CalculatorServiceRpc(const scoped_refptr<TestServerPicker>& server_picker,
-                       const scoped_refptr<RequestTracker>& request_tracker,
-                       const MonoTime& deadline,
-                       const shared_ptr<Messenger>& messenger,
-                       int value,
-                       CountDownLatch* latch)
-      : RetriableRpc(server_picker, request_tracker, deadline, messenger), latch_(latch) {
-    req_.set_value_to_add(value);
-    req_.set_randomly_fail(true);
-  }
-
-  void Try(CalculatorServiceProxy* server, const ResponseCallback& callback) override {
-    server->AddExactlyOnceAsync(req_,
-                                &resp_,
-                                mutable_retrier()->mutable_controller(),
-                                callback);
-  }
-
-  RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override {
-    // We shouldn't get errors from the server/rpc system since we set a high timeout.
-    CHECK_OK(rpc_cb_status);
-
-    RetriableRpcStatus status;
-    if (!mutable_retrier()->controller().status().ok()) {
-      CHECK(mutable_retrier()->controller().status().IsRemoteError());
-      status.result = RetriableRpcStatus::SERVER_BUSY;
-      return status;
-    }
-
-    // If the controller is not finished we're in the ReplicaFoundCb() callback.
-    // Return ok to proceed with the call to the server.
-    if (!mutable_retrier()->mutable_controller()->finished()) {
-      status.result = RetriableRpcStatus::OK;
-      return status;
-    }
-
-    // If we've received a response in the past, all following responses must
-    // match.
-    if (!successful_response_.IsInitialized()) {
-      successful_response_.CopyFrom(resp_);
-    } else {
-      CHECK_EQ(successful_response_.DebugString(), resp_.DebugString());
-    }
-
-    // Still report errors, with some probability. This will cause requests to
-    // be retried. Since the requests were originally successful we should get
-    // the same reply back.
-    int random = rand() % 4;
-    switch (random) {
-      case 0: status.result = RetriableRpcStatus::SERVER_BUSY; break;
-      case 1: status.result = RetriableRpcStatus::RESOURCE_NOT_FOUND; break;
-      case 2: status.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE; break;
-      case 3: status.result = RetriableRpcStatus::OK; break;
-      default: LOG(FATAL) << "Unexpected value";
-    }
-    return status;
-  }
-
-  void Finish(const Status& status) override {
-    CHECK_OK(status);
-    latch_->CountDown();
-    delete this;
-  }
-
-  std::string ToString() const override { return "test-rpc"; }
-  CountDownLatch* latch_;
-  ExactlyOnceResponsePB successful_response_;
-};
-
-} // namespace
-
-class RpcStressTest : public RpcTestBase {
- public:
-  void SetUp() override {
-    RpcTestBase::SetUp();
-    // Set up server.
-    StartTestServerWithGeneratedCode(&server_addr_);
-    client_messenger_ = CreateMessenger("Client");
-    proxy_.reset(new CalculatorServiceProxy(client_messenger_, server_addr_));
-    test_picker_.reset(new TestServerPicker(proxy_.get()));
-    request_tracker_.reset(new RequestTracker(kClientId));
-    attempt_nos_ = 0;
-  }
-
-  // An exactly once adder that uses RetriableRpc to perform the requests.
-  struct RetriableRpcExactlyOnceAdder {
-    RetriableRpcExactlyOnceAdder(const scoped_refptr<TestServerPicker>& server_picker,
-                     const scoped_refptr<RequestTracker>& request_tracker,
-                     const shared_ptr<Messenger>& messenger,
-                     int value) : latch_(1) {
-      MonoTime now = MonoTime::Now();
-      now.AddDelta(MonoDelta::FromMilliseconds(10000));
-      rpc_ = new CalculatorServiceRpc(server_picker,
-                                      request_tracker,
-                                      now,
-                                      messenger,
-                                      value,
-                                      &latch_);
-    }
-
-    void Start() {
-      CHECK_OK(kudu::Thread::Create(
-                   "test",
-                   "test",
-                   &RetriableRpcExactlyOnceAdder::SleepAndSend, this, &thread));
-    }
-
-    void SleepAndSend() {
-      rpc_->SendRpc();
-      latch_.Wait();
-    }
-
-    CountDownLatch latch_;
-    scoped_refptr<kudu::Thread> thread;
-    CalculatorServiceRpc* rpc_;
-  };
-
-  // An exactly once adder that sends multiple, simultaneous calls, to the server
-  // and makes sure that only one of the calls was successful.
-  struct SimultaneousExactlyOnceAdder {
-    SimultaneousExactlyOnceAdder(CalculatorServiceProxy* p,
-                     ResultTracker::SequenceNumber sequence_number,
-                     int value,
-                     uint64_t client_sleep,
-                     uint64_t server_sleep,
-                     int64_t attempt_no)
-     : proxy(p),
-       client_sleep_for_ms(client_sleep) {
-      req.set_value_to_add(value);
-      req.set_sleep_for_ms(server_sleep);
-      AddRequestId(&controller, kClientId, sequence_number, attempt_no);
-    }
-
-    void Start() {
-      CHECK_OK(kudu::Thread::Create(
-          "test",
-          "test",
-          &SimultaneousExactlyOnceAdder::SleepAndSend, this, &thread));
-    }
-
-    // Sleeps the preset number of msecs before sending the call.
-    void SleepAndSend() {
-      usleep(client_sleep_for_ms * 1000);
-      controller.set_timeout(MonoDelta::FromSeconds(20));
-      CHECK_OK(proxy->AddExactlyOnce(req, &resp, &controller));
-    }
-
-    CalculatorServiceProxy* const proxy;
-    const uint64_t client_sleep_for_ms;
-    RpcController controller;
-    ExactlyOnceRequestPB req;
-    ExactlyOnceResponsePB resp;
-    scoped_refptr<kudu::Thread> thread;
-  };
-
-
-  void CheckValueMatches(int expected_value) {
-    RpcController controller;
-    ExactlyOnceRequestPB req;
-    req.set_value_to_add(0);
-    ExactlyOnceResponsePB resp;
-    RequestTracker::SequenceNumber seq_no;
-    CHECK_OK(request_tracker_->NewSeqNo(&seq_no));
-    AddRequestId(&controller, kClientId, seq_no, 0);
-    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
-    ASSERT_EQ(resp.current_val(), expected_value);
-    request_tracker_->RpcCompleted(seq_no);
-  }
-
-
- protected:
-  Sockaddr server_addr_;
-  atomic_int attempt_nos_;
-  shared_ptr<Messenger> client_messenger_;
-  std::unique_ptr<CalculatorServiceProxy> proxy_;
-  scoped_refptr<TestServerPicker> test_picker_;
-  scoped_refptr<RequestTracker> request_tracker_;
-};
-
-// Tests that we get exactly once semantics on RPCs when we send a bunch of requests with the
-// same sequence number as previous request.
-TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
-  ExactlyOnceResponsePB original_resp;
-  int mem_consumption = mem_tracker_->consumption();
-  {
-    RpcController controller;
-    ExactlyOnceRequestPB req;
-    req.set_value_to_add(1);
-
-    // Assign id 0.
-    AddRequestId(&controller, kClientId, 0, 0);
-
-    // Send the request the first time.
-    ASSERT_OK(proxy_->AddExactlyOnce(req, &original_resp, &controller));
-
-    // The incremental usage of a new client is the size of the response itself
-    // plus some fixed overhead for the client-tracking structure.
-    int expected_incremental_usage = original_resp.SpaceUsed() + 200;
-
-    // The consumption isn't immediately updated, since the MemTracker update
-    // happens after we call 'Respond' on the RPC.
-    int mem_consumption_after;
-    AssertEventually([&]() {
-        mem_consumption_after = mem_tracker_->consumption();
-        ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
-      });
-    mem_consumption = mem_consumption_after;
-  }
-
-  // Now repeat the rpc 10 times, using the same sequence number, none of these should be executed
-  // and they should get the same response back.
-  for (int i = 0; i < 10; i++) {
-    RpcController controller;
-    controller.set_timeout(MonoDelta::FromSeconds(20));
-    ExactlyOnceRequestPB req;
-    req.set_value_to_add(1);
-    ExactlyOnceResponsePB resp;
-    AddRequestId(&controller, kClientId, 0, i + 1);
-    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
-    ASSERT_EQ(resp.current_val(), 1);
-    ASSERT_EQ(resp.current_time_micros(), original_resp.current_time_micros());
-    // Sleep to give the MemTracker time to update -- we don't expect any update,
-    // but if we had a bug here, we'd only see it with this sleep.
-    SleepFor(MonoDelta::FromMilliseconds(100));
-    // We shouldn't have consumed any more memory since the responses were cached.
-    ASSERT_EQ(mem_consumption, mem_tracker_->consumption());
-  }
-
-  // Making a new request, from a new client, should double the memory consumption.
-  {
-    RpcController controller;
-    ExactlyOnceRequestPB req;
-    ExactlyOnceResponsePB resp;
-    req.set_value_to_add(1);
-
-    // Assign id 0.
-    AddRequestId(&controller, "test-client2", 0, 0);
-
-    // Send the first request for this new client.
-    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
-    AssertEventually([&]() {
-        ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
-      });
-  }
-}
-
-// Performs a series of requests in which each single request is attempted multiple times, as
-// the server side is instructed to spuriously fail attempts.
-// In CalculatorServiceRpc we sure that the same response is returned by all retries and,
-// after all the rpcs are done, we make sure that final result is the expected one.
-TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
-  int kNumIterations = 10;
-  int kNumRpcs = 10;
-
-  if (AllowSlowTests()) {
-    kNumIterations = 100;
-    kNumRpcs = 100;
-  }
-
-  int count = 0;
-  for (int i = 0; i < kNumIterations; i ++) {
-    vector<unique_ptr<RetriableRpcExactlyOnceAdder>> adders;
-    for (int j = 0; j < kNumRpcs; j++) {
-      unique_ptr<RetriableRpcExactlyOnceAdder> adder(
-          new RetriableRpcExactlyOnceAdder(test_picker_, request_tracker_, client_messenger_, j));
-      adders.push_back(std::move(adder));
-      adders[j]->Start();
-      count += j;
-    }
-    for (int j = 0; j < kNumRpcs; j++) {
-      CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
-    }
-    CheckValueMatches(count);
-  }
-}
-
-// Performs a series of requests in which each single request is attempted by multiple threads.
-// On each iteration, after all the threads complete, we expect that the add operation was
-// executed exactly once.
-TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
-  int kNumIterations = 10;
-  int kNumThreads = 10;
-
-  if (AllowSlowTests()) {
-    kNumIterations = 100;
-    kNumThreads = 100;
-  }
-
-  ResultTracker::SequenceNumber sequence_number = 0;
-  int memory_consumption_initial = mem_tracker_->consumption();
-  int single_response_size = 0;
-
-  // Measure memory consumption for a single response from the same client.
-  {
-    RpcController controller;
-    ExactlyOnceRequestPB req;
-    ExactlyOnceResponsePB resp;
-    req.set_value_to_add(1);
-    AddRequestId(&controller, kClientId, sequence_number, 0);
-    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
-    single_response_size = resp.SpaceUsed();
-  }
-
-  for (int i = 1; i <= kNumIterations; i ++) {
-    vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
-    for (int j = 0; j < kNumThreads; j++) {
-      unique_ptr<SimultaneousExactlyOnceAdder> adder(
-          new SimultaneousExactlyOnceAdder(proxy_.get(), i, 1,
-                                           rand() % 20,
-                                           rand() % 10,
-                                           attempt_nos_.fetch_add(1)));
-      adders.push_back(std::move(adder));
-      adders[j]->Start();
-    }
-    uint64_t time_micros = 0;
-    for (int j = 0; j < kNumThreads; j++) {
-      CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
-      ASSERT_EQ(adders[j]->resp.current_val(), i + 1);
-      if (time_micros == 0) {
-        time_micros = adders[j]->resp.current_time_micros();
-      } else {
-        ASSERT_EQ(adders[j]->resp.current_time_micros(), time_micros);
-      }
-    }
-
-    // Wait for the MemTracker to be updated.
-    // After all adders finished we should at least the size of one more response.
-    // The actual size depends of multiple factors, for instance, how many calls were "attached"
-    // (which is timing dependent) so we can't be more precise than this.
-    AssertEventually([&]() {
-        ASSERT_GT(mem_tracker_->consumption(),
-                  memory_consumption_initial + single_response_size * i);
-      });
-  }
-}
-
-} // namespace rpc
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 0292c74..8adc5e2 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -123,7 +123,7 @@ message RequestIdPB {
 
   // The sequence number of the first RPC that has not been marked as completed by the client.
   // Unset if there isn't an incomplete RPC.
-  optional int64 first_incomplete_seq_no = 3;
+  required int64 first_incomplete_seq_no = 3;
 
   // The number of times this RPC has been tried.
   // Set to 1 in the first attempt.
@@ -216,6 +216,10 @@ message ErrorStatusPB {
     // or the server does not support the required feature flags.
     ERROR_INVALID_REQUEST = 5;
 
+    // The server might have previously received this request but its response is no
+    // longer cached. It's unknown whether the request was executed or not.
+    ERROR_REQUEST_STALE = 6;
+
     // FATAL_* errors indicate that the client should shut down the connection.
     //------------------------------------------------------------
     // The RPC server is already shutting down.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index 8b47ce9..f3863f2 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -117,6 +117,9 @@ void GeneratedServiceIf::Handle(InboundCall *call) {
         break;
       case ResultTracker::COMPLETED:
       case ResultTracker::IN_PROGRESS:
+      case ResultTracker::STALE:
+        // ResultTracker has already responded to the RPC and deleted
+        // 'ctx'.
         return;
       default:
         LOG(FATAL) << "Unknown state: " << state;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index f84fb1b..135f7cc 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1245,11 +1245,15 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
         << write->tablet_id() << ". State: " << 0 << " id: "
         << replicate_msg->request_id().DebugString();
     // We only replay committed requests so the result of tracking this request can be:
-    // NEW - This is a previously untracked request, or we changed the driver -> store the result
-    // COMPLETED - We've bootstrapped this tablet twice, and previously stored the result -> do
-    //             nothing.
+    // NEW:
+    //   This is a previously untracked request, or we changed the driver -> store the result
+    // COMPLETED or STALE:
+    //   We've bootstrapped this tablet twice, and previously stored the result -> do
+    //   nothing.
     state = result_tracker_->TrackRpcOrChangeDriver(replicate_msg->request_id());
-    CHECK(state == ResultTracker::RpcState::NEW || state == ResultTracker::RpcState::COMPLETED)
+    CHECK(state == ResultTracker::RpcState::NEW ||
+          state == ResultTracker::RpcState::COMPLETED ||
+          state == ResultTracker::RpcState::STALE)
         << "Wrong state: " << state;
     response.reset(new WriteResponsePB());
     response->set_timestamp(replicate_msg->timestamp());

http://git-wip-us.apache.org/repos/asf/kudu/blob/fc09ddcd/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 974969d..c97d71b 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -217,13 +217,14 @@ void TransactionDriver::RegisterFollowerTransactionOnResultTracker() {
     case ResultTracker::RpcState::NEW:
       // We're the only ones trying to execute the transaction (normal case). Proceed.
       return;
-      // If this RPC was previously completed (like if the same tablet was bootstrapped twice)
-      // stop tracking the result. Only follower transactions can observe this state so we
-      // simply reset the callback and the result will not be tracked anymore.
+      // If this RPC was previously completed or is already stale (like if the same tablet was
+      // bootstrapped twice) stop tracking the result. Only follower transactions can observe these
+      // states so we simply reset the callback and the result will not be tracked anymore.
+    case ResultTracker::RpcState::STALE:
     case ResultTracker::RpcState::COMPLETED: {
       mutable_state()->set_completion_callback(
           gscoped_ptr<TransactionCompletionCallback>(new TransactionCompletionCallback()));
-      VLOG(1) << state()->result_tracker() << " Follower Rpc was not NEW or IN_PROGRESS: "
+      VLOG(2) << state()->result_tracker() << " Follower Rpc was already COMPLETED or STALE: "
           << rpc_state << " OpId: " << state()->op_id().ShortDebugString()
           << " RequestId: " << state()->request_id().ShortDebugString();
       return;