You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/14 22:36:50 UTC

[2/4] incubator-kudu git commit: Integrate the request tracker with the client

Integrate the request tracker with the client

This integrates the request tracker with the client, making sure
that write requests sent by the client are tracked in the RequestTracker
and contain the information necessary for exactly once semantics.

This adds to rpc-stress-test a test that uses RetriableRpc, instead
of using the proxy directly.

Change-Id: I94207c294452fcbdb3a7901fdb702674d47553ee
Reviewed-on: http://gerrit.cloudera.org:8080/3080
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/16b5bd27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/16b5bd27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/16b5bd27

Branch: refs/heads/master
Commit: 16b5bd27ced666e774ad8fcceccd662cd634a4dc
Parents: 5d4d2b1
Author: David Alves <da...@cloudera.com>
Authored: Tue May 3 13:24:03 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Jul 14 21:29:00 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc         |   7 +-
 src/kudu/client/client-internal.cc |   1 +
 src/kudu/client/client-internal.h  |   4 +
 src/kudu/client/client.cc          |   2 +
 src/kudu/rpc/retriable_rpc.h       |  52 +++++++++-
 src/kudu/rpc/rpc-stress-test.cc    | 171 ++++++++++++++++++++++++++++++++
 src/kudu/rpc/rpc-test-base.h       |   7 ++
 src/kudu/rpc/rpc_controller.cc     |   2 +-
 src/kudu/rpc/rpc_controller.h      |   3 +
 src/kudu/rpc/rtest.proto           |   1 +
 10 files changed, 244 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index c167215..e15d135 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -45,6 +45,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/rpc/request_tracker.h"
 #include "kudu/rpc/retriable_rpc.h"
 #include "kudu/rpc/rpc.h"
 #include "kudu/tserver/tserver_service.proxy.h"
@@ -62,6 +63,7 @@ namespace kudu {
 
 using rpc::ErrorStatusPB;
 using rpc::Messenger;
+using rpc::RequestTracker;
 using rpc::ResponseCallback;
 using rpc::RetriableRpc;
 using rpc::RetriableRpcStatus;
@@ -189,6 +191,7 @@ class WriteRpc : public RetriableRpc<RemoteTabletServer, WriteRequestPB, WriteRe
  public:
   WriteRpc(const scoped_refptr<Batcher>& batcher,
            const scoped_refptr<MetaCacheServerPicker>& replica_picker,
+           const scoped_refptr<RequestTracker>& request_tracker,
            vector<InFlightOp*> ops,
            const MonoTime& deadline,
            const shared_ptr<Messenger>& messenger,
@@ -225,11 +228,12 @@ class WriteRpc : public RetriableRpc<RemoteTabletServer, WriteRequestPB, WriteRe
 
 WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
                    const scoped_refptr<MetaCacheServerPicker>& replica_picker,
+                   const scoped_refptr<RequestTracker>& request_tracker,
                    vector<InFlightOp*> ops,
                    const MonoTime& deadline,
                    const shared_ptr<Messenger>& messenger,
                    const string& tablet_id)
-    : RetriableRpc(replica_picker, deadline, messenger),
+    : RetriableRpc(replica_picker, request_tracker, deadline, messenger),
       batcher_(batcher),
       ops_(std::move(ops)),
       tablet_id_(tablet_id) {
@@ -706,6 +710,7 @@ void Batcher::FlushBuffer(RemoteTablet* tablet, const vector<InFlightOp*>& ops)
                                 tablet));
   WriteRpc* rpc = new WriteRpc(this,
                                server_picker,
+                               client_->data_->request_tracker_,
                                ops,
                                deadline_,
                                client_->data_->messenger_,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index b525571..4a802bc 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -35,6 +35,7 @@
 #include "kudu/master/master_rpc.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
+#include "kudu/rpc/request_tracker.h"
 #include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 7bf001a..6028bb3 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -44,6 +44,7 @@ class MasterServiceProxy;
 
 namespace rpc {
 class Messenger;
+class RequestTracker;
 class RpcController;
 } // namespace rpc
 
@@ -196,6 +197,9 @@ class KuduClient::Data {
   // The unique id of this client.
   std::string client_id_;
 
+  // The request tracker for this client.
+  scoped_refptr<rpc::RequestTracker> request_tracker_;
+
   std::shared_ptr<rpc::Messenger> messenger_;
   gscoped_ptr<DnsResolver> dns_resolver_;
   scoped_refptr<internal::MetaCache> meta_cache_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 8507817..6c7b967 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -241,6 +241,8 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>* client) {
   RETURN_NOT_OK_PREPEND(c->data_->InitLocalHostNames(),
                         "Could not determine local host names");
 
+  c->data_->request_tracker_ = new rpc::RequestTracker(c->data_->client_id_);
+
   client->swap(c);
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index 526385a..f2d3ab5 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -21,13 +21,19 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/request_tracker.h"
 #include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/util/monotime.h"
 
 namespace kudu {
 namespace rpc {
 
+namespace internal {
+typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
+}
+
 // A base class for retriable RPCs that handles replica picking and retry logic.
 //
 // The 'Server' template parameter refers to the the type of the server that will be looked up
@@ -44,12 +50,18 @@ template <class Server, class RequestPB, class ResponsePB>
 class RetriableRpc : public Rpc {
  public:
   RetriableRpc(const scoped_refptr<ServerPicker<Server>>& server_picker,
+               const scoped_refptr<RequestTracker>& request_tracker,
                const MonoTime& deadline,
                const std::shared_ptr<Messenger>& messenger)
    : Rpc(deadline, messenger),
-     server_picker_(server_picker) {}
+     server_picker_(server_picker),
+     request_tracker_(request_tracker),
+     sequence_number_(RequestTracker::NO_SEQ_NO),
+     num_attempts_(0) {}
 
-  virtual ~RetriableRpc() {}
+  virtual ~RetriableRpc() {
+    DCHECK_EQ(sequence_number_, RequestTracker::NO_SEQ_NO);
+  }
 
   // Performs server lookup/initialization.
   // If/when the server is looked up and initialized successfully RetriableRpc will call
@@ -88,10 +100,20 @@ class RetriableRpc : public Rpc {
   // Called when after the RPC was performed.
   void SendRpcCb(const Status& status) override;
 
+  // Performs final cleanup, after the RPC is done (independently of success).
+  void FinishInternal();
+
   scoped_refptr<ServerPicker<Server>> server_picker_;
+  scoped_refptr<RequestTracker> request_tracker_;
   const MonoTime deadline_;
   std::shared_ptr<Messenger> messenger_;
 
+  // The sequence number for this RPC.
+  internal::SequenceNumber sequence_number_;
+
+  // The number of times this RPC has been attempted
+  int32 num_attempts_;
+
   // Keeps track of the replica the RPCs were sent to.
   // TODO Remove this and pass the used replica around. For now we need to keep this as
   // the retrier calls the SendRpcCb directly and doesn't know the replica that was
@@ -101,6 +123,9 @@ class RetriableRpc : public Rpc {
 
 template <class Server, class RequestPB, class ResponsePB>
 void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpc()  {
+  if (sequence_number_ == RequestTracker::NO_SEQ_NO) {
+    CHECK_OK(request_tracker_->NewSeqNo(&sequence_number_));
+  }
   server_picker_->PickLeader(Bind(&RetriableRpc::ReplicaFoundCb,
                                   Unretained(this)),
                              retrier().deadline());
@@ -144,16 +169,34 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(const RetriableR
 }
 
 template <class Server, class RequestPB, class ResponsePB>
+void RetriableRpc<Server, RequestPB, ResponsePB>::FinishInternal() {
+  // Mark the RPC as completed and set the sequence number to NO_SEQ_NO to make
+  // sure we're in the appropriate state before destruction.
+  request_tracker_->RpcCompleted(sequence_number_);
+  sequence_number_ = RequestTracker::NO_SEQ_NO;
+}
+
+template <class Server, class RequestPB, class ResponsePB>
 void RetriableRpc<Server, RequestPB, ResponsePB>::ReplicaFoundCb(const Status& status,
                                                                  Server* server) {
   RetriableRpcStatus result = AnalyzeResponse(status);
   if (RetryIfNeeded(result, server)) return;
 
   if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) {
+    FinishInternal();
     Finish(result.status);
     return;
   }
 
+  // We successfully found a replica, so prepare the RequestIdPB before we send out the call.
+  std::unique_ptr<RequestIdPB> request_id(new RequestIdPB());
+  request_id->set_client_id(request_tracker_->client_id());
+  request_id->set_seq_no(sequence_number_);
+  request_id->set_first_incomplete_seq_no(request_tracker_->FirstIncomplete());
+  request_id->set_attempt_no(num_attempts_++);
+
+  mutable_retrier()->mutable_controller()->SetRequestIdPB(std::move(request_id));
+
   DCHECK_EQ(result.result, RetriableRpcStatus::OK);
   current_ = server;
   Try(server, boost::bind(&RetriableRpc::SendRpcCb, this, Status::OK()));
@@ -164,7 +207,9 @@ void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpcCb(const Status& status
   RetriableRpcStatus result = AnalyzeResponse(status);
   if (RetryIfNeeded(result, current_)) return;
 
-  // From here on out the RPC has either succeeded of suffered a non-retriable
+  FinishInternal();
+
+  // From here on out the rpc has either succeeded of suffered a non-retriable
   // failure.
   Status final_status = result.status;
   if (!final_status.ok()) {
@@ -179,6 +224,5 @@ void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpcCb(const Status& status
   Finish(final_status);
 }
 
-
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/rpc/rpc-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-stress-test.cc b/src/kudu/rpc/rpc-stress-test.cc
index c815ea1..3be88de 100644
--- a/src/kudu/rpc/rpc-stress-test.cc
+++ b/src/kudu/rpc/rpc-stress-test.cc
@@ -41,6 +41,95 @@ void AddRequestId(RpcController* controller,
   controller->SetRequestIdPB(std::move(request_id));
 }
 
+class TestServerPicker : public ServerPicker<CalculatorServiceProxy> {
+ public:
+  explicit TestServerPicker(CalculatorServiceProxy* proxy) : proxy_(proxy) {}
+
+  void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override {
+    callback.Run(Status::OK(), proxy_);
+  }
+
+  void MarkServerFailed(CalculatorServiceProxy*, const Status&) override {}
+  void MarkReplicaNotLeader(CalculatorServiceProxy*) override {}
+  void MarkResourceNotFound(CalculatorServiceProxy*) override {}
+
+ private:
+  CalculatorServiceProxy* proxy_;
+};
+
+class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
+                                                 ExactlyOnceRequestPB,
+                                                 ExactlyOnceResponsePB> {
+ public:
+  CalculatorServiceRpc(const scoped_refptr<TestServerPicker>& server_picker,
+                       const scoped_refptr<RequestTracker>& request_tracker,
+                       const MonoTime& deadline,
+                       const shared_ptr<Messenger>& messenger,
+                       int value,
+                       CountDownLatch* latch)
+      : RetriableRpc(server_picker, request_tracker, deadline, messenger), latch_(latch) {
+    req_.set_value_to_add(value);
+    req_.set_randomly_fail(true);
+  }
+
+  void Try(CalculatorServiceProxy* server, const ResponseCallback& callback) override {
+    server->AddExactlyOnceAsync(req_,
+                                &resp_,
+                                mutable_retrier()->mutable_controller(),
+                                callback);
+  }
+
+  RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override {
+    // We shouldn't get errors from the server/rpc system since we set a high timeout.
+    CHECK_OK(rpc_cb_status);
+
+    RetriableRpcStatus status;
+    if (!mutable_retrier()->controller().status().ok()) {
+      CHECK(mutable_retrier()->controller().status().IsRemoteError());
+      status.result = RetriableRpcStatus::SERVER_BUSY;
+      return status;
+    }
+
+    // If the controller is not finished we're in the ReplicaFoundCb() callback.
+    // Return ok to proceed with the call to the server.
+    if (!mutable_retrier()->mutable_controller()->finished()) {
+      status.result = RetriableRpcStatus::OK;
+      return status;
+    }
+
+    // If we've received a response in the past, all following responses must
+    // match.
+    if (!successful_response_.IsInitialized()) {
+      successful_response_.CopyFrom(resp_);
+    } else {
+      CHECK_EQ(successful_response_.DebugString(), resp_.DebugString());
+    }
+
+    // Still report errors, with some probability. This will cause requests to
+    // be retried. Since the requests were originally successful we should get
+    // the same reply back.
+    int random = rand() % 4;
+    switch (random) {
+      case 0: status.result = RetriableRpcStatus::SERVER_BUSY; break;
+      case 1: status.result = RetriableRpcStatus::RESOURCE_NOT_FOUND; break;
+      case 2: status.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE; break;
+      case 3: status.result = RetriableRpcStatus::OK; break;
+      default: LOG(FATAL) << "Unexpected value";
+    }
+    return status;
+  }
+
+  void Finish(const Status& status) override {
+    CHECK_OK(status);
+    latch_->CountDown();
+    delete this;
+  }
+
+  std::string ToString() const override { return "test-rpc"; }
+  CountDownLatch* latch_;
+  ExactlyOnceResponsePB successful_response_;
+};
+
 } // namespace
 
 class RpcStressTest : public RpcTestBase {
@@ -51,9 +140,44 @@ class RpcStressTest : public RpcTestBase {
     StartTestServerWithGeneratedCode(&server_addr_);
     client_messenger_ = CreateMessenger("Client");
     proxy_.reset(new CalculatorServiceProxy(client_messenger_, server_addr_));
+    test_picker_.reset(new TestServerPicker(proxy_.get()));
+    request_tracker_.reset(new RequestTracker(kClientId));
     attempt_nos_ = 0;
   }
 
+  // An exactly once adder that uses RetriableRpc to perform the requests.
+  struct RetriableRpcExactlyOnceAdder {
+    RetriableRpcExactlyOnceAdder(const scoped_refptr<TestServerPicker>& server_picker,
+                     const scoped_refptr<RequestTracker>& request_tracker,
+                     const shared_ptr<Messenger>& messenger,
+                     int value) : latch_(1) {
+      MonoTime now = MonoTime::Now(MonoTime::FINE);
+      now.AddDelta(MonoDelta::FromMilliseconds(10000));
+      rpc_ = new CalculatorServiceRpc(server_picker,
+                                      request_tracker,
+                                      now,
+                                      messenger,
+                                      value,
+                                      &latch_);
+    }
+
+    void Start() {
+      CHECK_OK(kudu::Thread::Create(
+                   "test",
+                   "test",
+                   &RetriableRpcExactlyOnceAdder::SleepAndSend, this, &thread));
+    }
+
+    void SleepAndSend() {
+      rpc_->SendRpc();
+      latch_.Wait();
+    }
+
+    CountDownLatch latch_;
+    scoped_refptr<kudu::Thread> thread;
+    CalculatorServiceRpc* rpc_;
+  };
+
   // An exactly once adder that sends multiple, simultaneous calls, to the server
   // and makes sure that only one of the calls was successful.
   struct SimultaneousExactlyOnceAdder {
@@ -92,11 +216,28 @@ class RpcStressTest : public RpcTestBase {
     scoped_refptr<kudu::Thread> thread;
   };
 
+
+  void CheckValueMatches(int expected_value) {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(0);
+    ExactlyOnceResponsePB resp;
+    RequestTracker::SequenceNumber seq_no;
+    CHECK_OK(request_tracker_->NewSeqNo(&seq_no));
+    AddRequestId(&controller, seq_no, 0);
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    ASSERT_EQ(resp.current_val(), expected_value);
+    request_tracker_->RpcCompleted(seq_no);
+  }
+
+
  protected:
   Sockaddr server_addr_;
   atomic_int attempt_nos_;
   shared_ptr<Messenger> client_messenger_;
   std::unique_ptr<CalculatorServiceProxy> proxy_;
+  scoped_refptr<TestServerPicker> test_picker_;
+  scoped_refptr<RequestTracker> request_tracker_;
 };
 
 // Tests that we get exactly once semantics on RPCs when we send a bunch of requests with the
@@ -130,6 +271,36 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
   }
 }
 
+// Performs a series of requests in which each single request is attempted multiple times, as
+// the server side is instructed to spuriously fail attempts.
+// In CalculatorServiceRpc we sure that the same response is returned by all retries and,
+// after all the rpcs are done, we make sure that final result is the expected one.
+TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
+  int kNumIterations = 10;
+  int kNumRpcs = 10;
+
+  if (AllowSlowTests()) {
+    kNumIterations = 100;
+    kNumRpcs = 100;
+  }
+
+  int count = 0;
+  for (int i = 0; i < kNumIterations; i ++) {
+    vector<unique_ptr<RetriableRpcExactlyOnceAdder>> adders;
+    for (int j = 0; j < kNumRpcs; j++) {
+      unique_ptr<RetriableRpcExactlyOnceAdder> adder(
+          new RetriableRpcExactlyOnceAdder(test_picker_, request_tracker_, client_messenger_, j));
+      adders.push_back(std::move(adder));
+      adders[j]->Start();
+      count += j;
+    }
+    for (int j = 0; j < kNumRpcs; j++) {
+      CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
+    }
+    CheckValueMatches(count);
+  }
+}
+
 // Performs a series of requests in which each single request is attempted by multiple threads.
 // On each iteration, after all the threads complete, we expect that the add operation was
 // executed exactly once.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 1b317a6..ec0edc9 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -267,6 +267,13 @@ class CalculatorService : public CalculatorServiceIf {
     if (req->sleep_for_ms() > 0) {
       usleep(req->sleep_for_ms() * 1000);
     }
+    // If failures are enabled, cause them some percentage of the time.
+    if (req->randomly_fail()) {
+      if (rand() % 10 < 3) {
+        context->RespondFailure(Status::ServiceUnavailable("Random injected failure."));
+        return;
+      }
+    }
     int result = exactly_once_test_val_ += req->value_to_add();
     resp->set_current_val(result);
     resp->set_current_time_micros(GetCurrentTimeMicros());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 64a97f7..97bd262 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -100,7 +100,7 @@ bool RpcController::has_request_id() const {
 
 const RequestIdPB& RpcController::request_id() const {
   DCHECK(has_request_id());
-  return *request_id_.get();
+  return *request_id_;
 }
 
 void RpcController::RequireServerFeature(uint32_t feature) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index 4fd362b..fbe94c7 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -113,6 +113,9 @@ class RpcController {
   bool has_request_id() const;
 
   // Returns the currently set request id.
+  // When the request is sent to the server, it gets "moved" from RpcController
+  // so an absence of a request after send doesn't mean one wasn't sent.
+  // REQUIRES: the controller has a request ID set.
   const RequestIdPB& request_id() const;
 
   // Add a requirement that the server side must support a feature with the

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/16b5bd27/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index 740e20b..b138258 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -110,6 +110,7 @@ enum FeatureFlags {
 message ExactlyOnceRequestPB {
   optional uint32 sleep_for_ms = 1 [default = 0];
   required uint32 value_to_add = 2;
+  optional bool randomly_fail = 3 [default = false];
 }
 message ExactlyOnceResponsePB {
   required uint32 current_val = 1;