You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/04/29 02:44:18 UTC

kudu git commit: [rpc] handling ERROR_UNAVAILABLE RPC error

Repository: kudu
Updated Branches:
  refs/heads/master eb79a7183 -> d4cffa503


[rpc] handling ERROR_UNAVAILABLE RPC error

This patch adds handling of the newly introduced ERROR_UNAVAILABLE RPC
error code.  The ERROR_UNAVAILABLE error code is a broader counterpart
of the ERROR_SERVER_TOO_BUSY.

>From the client side, both ERROR_UNAVAILABLE and ERROR_SERVER_TOO_BUSY
codes mean it's worth retrying the call at a later time.  To reflect
that, the internal codes {RetriableRpcStatus,ScanRpcStatus}::SERVER_BUSY
are replaced with more generic SERVICE_UNAVAILABLE.

Added an integration test to cover the behavior of the server components
and the Kudu C++ client when client sends authn token signed by a TSK
unknown to master and tablet servers.

Change-Id: I87d780a4ad88c15ceaacfddf6c1b69ed053bb959
Reviewed-on: http://gerrit.cloudera.org:8080/6640
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d4cffa50
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d4cffa50
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d4cffa50

Branch: refs/heads/master
Commit: d4cffa503ae8d913bf3834bcc92f75f80d1f640f
Parents: eb79a71
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Apr 14 18:21:28 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Sat Apr 29 02:43:11 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc                      |  21 +-
 src/kudu/client/client-internal.cc              |   5 +-
 src/kudu/client/client.h                        |   2 +
 src/kudu/client/scanner-internal.cc             |   8 +-
 src/kudu/client/scanner-internal.h              |   8 +-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../security-unknown-tsk-itest.cc               | 431 +++++++++++++++++++
 src/kudu/integration-tests/test_workload.cc     |  23 +-
 src/kudu/integration-tests/test_workload.h      |  12 +
 src/kudu/rpc/client_negotiation.cc              |   2 +
 src/kudu/rpc/exactly_once_rpc-test.cc           |  19 +-
 src/kudu/rpc/outbound_call.h                    |   2 +-
 src/kudu/rpc/retriable_rpc.h                    |   7 +-
 src/kudu/rpc/rpc.cc                             |   9 +-
 src/kudu/rpc/rpc.h                              |  10 +-
 src/kudu/rpc/rpc_context.h                      |   4 +-
 16 files changed, 529 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index a3d9b7a..1586cae 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -30,13 +30,13 @@
 #include <glog/logging.h>
 
 #include "kudu/client/callbacks.h"
-#include "kudu/client/client.h"
 #include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/meta_cache.h"
 #include "kudu/client/session-internal.h"
-#include "kudu/client/write_op.h"
 #include "kudu/client/write_op-internal.h"
+#include "kudu/client/write_op.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/row_operations.h"
 #include "kudu/common/wire_protocol.h"
@@ -49,6 +49,7 @@
 #include "kudu/rpc/request_tracker.h"
 #include "kudu/rpc/retriable_rpc.h"
 #include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/logging.h"
@@ -337,19 +338,25 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
     result.status = mutable_retrier()->controller().status();
   }
 
+  // Check for specific RPC errors.
   if (result.status.IsRemoteError()) {
     const ErrorStatusPB* err = mutable_retrier()->controller().error_response();
-    if (err &&
-        err->has_code() &&
-        err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
-      result.result = RetriableRpcStatus::SERVER_BUSY;
+    if (err && err->has_code() &&
+        (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+         err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+      result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
       return result;
     }
   }
 
+  if (result.status.IsServiceUnavailable()) {
+    result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+    return result;
+  }
+
   // Failover to a replica in the event of any network failure or of a DNS resolution problem.
   //
-  // TODO: This is probably too harsh; some network failures should be
+  // TODO(adar): This is probably too harsh; some network failures should be
   // retried on the current replica.
   if (result.status.IsNetworkError()) {
     result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 1f61e0f..3f45d02 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -186,7 +186,10 @@ Status KuduClient::Data::SyncLeaderMasterRpc(
       const ErrorStatusPB* err = rpc.error_response();
       if (err &&
           err->has_code() &&
-          err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
+          (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+           err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+        // The UNAVAILABLE error code is a broader counterpart of the
+        // SERVER_TOO_BUSY. In both cases it's necessary to retry a bit later.
         continue;
       }
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 33a95de..a52c479 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -52,6 +52,7 @@ namespace kudu {
 class ClientStressTest_TestUniqueClientIds_Test;
 class LinkedListTester;
 class PartitionSchema;
+class SecurityUnknownTskTest;
 
 namespace tools {
 class LeaderMasterProxy;
@@ -527,6 +528,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class KuduTable;
   friend class KuduTableAlterer;
   friend class KuduTableCreator;
+  friend class ::kudu::SecurityUnknownTskTest;
   friend class tools::LeaderMasterProxy;
 
   FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index c804046..8ca16b7 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -83,7 +83,7 @@ Status KuduScanner::Data::HandleError(const ScanRpcStatus& err,
   bool can_retry = true;
   bool backoff = false;
   switch (err.result) {
-    case ScanRpcStatus::SERVER_BUSY:
+    case ScanRpcStatus::SERVICE_UNAVAILABLE:
       backoff = true;
       break;
     case ScanRpcStatus::RPC_DEADLINE_EXCEEDED:
@@ -173,8 +173,10 @@ ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status,
       switch (controller_.error_response()->code()) {
         case rpc::ErrorStatusPB::ERROR_INVALID_REQUEST:
           return ScanRpcStatus{ScanRpcStatus::INVALID_REQUEST, rpc_status};
-        case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY:
-          return ScanRpcStatus{ScanRpcStatus::SERVER_BUSY, rpc_status};
+        case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY: // fall-through
+        case rpc::ErrorStatusPB::ERROR_UNAVAILABLE:
+          return ScanRpcStatus{
+              ScanRpcStatus::SERVICE_UNAVAILABLE, rpc_status};
         default:
           return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 4292f7e..154ba42 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -46,8 +46,12 @@ struct ScanRpcStatus {
     // The request was malformed (e.g. bad schema, etc).
     INVALID_REQUEST,
 
-    // The server was busy (e.g. RPC queue overflow).
-    SERVER_BUSY,
+    // The server received the request but it was not ready to serve it right
+    // away. It might happen that the server was too busy and did not have
+    // necessary resources or information to serve the request but it
+    // anticipates it should be ready to serve the request really soon, so it's
+    // worth retrying the request at a later time.
+    SERVICE_UNAVAILABLE,
 
     // The deadline for the whole batch was exceeded.
     OVERALL_DEADLINE_EXCEEDED,

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 2ef0fd2..d81ad37 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -79,6 +79,7 @@ ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
 ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(security-faults-itest)
 ADD_KUDU_TEST(security-itest)
+ADD_KUDU_TEST(security-unknown-tsk-itest)
 ADD_KUDU_TEST(table_locations-itest)
 ADD_KUDU_TEST(tablet_copy-itest)
 ADD_KUDU_TEST(tablet_copy_client_session-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/security-unknown-tsk-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/security-unknown-tsk-itest.cc b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
new file mode 100644
index 0000000..e610b6c
--- /dev/null
+++ b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
@@ -0,0 +1,431 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <boost/none.hpp>
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/schema.h"
+#include "kudu/client/client.h"
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/schema.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/master-test-util.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(rpc_reopen_outbound_connections);
+DECLARE_bool(rpc_trace_negotiation);
+DECLARE_int32(heartbeat_interval_ms);
+
+using std::atomic;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+namespace kudu {
+
+using client::KuduClient;
+using client::KuduClientBuilder;
+using client::KuduError;
+using client::KuduInsert;
+using client::KuduRowResult;
+using client::KuduScanner;
+using client::KuduSchema;
+using client::KuduSession;
+using client::KuduTable;
+using client::KuduTableCreator;
+using client::sp::shared_ptr;
+using security::DataFormat;
+using security::PrivateKey;
+using security::SignedTokenPB;
+using security::TokenPB;
+using security::TokenSigner;
+using security::TokenSigningPrivateKeyPB;
+using security::TokenVerifier;
+using security::VerificationResult;
+
+class SecurityUnknownTskTest : public KuduTest {
+ public:
+  SecurityUnknownTskTest()
+      : num_tablet_servers_(3),
+        heartbeat_interval_ms_(100),
+        schema_(client::KuduSchemaFromSchema(CreateKeyValueTestSchema())) {
+
+    // Make the ts->master heartbeat interval shorter to run the test faster.
+    FLAGS_heartbeat_interval_ms = heartbeat_interval_ms_;
+
+    // Within the scope of the same reactor thread, close an already established
+    // idle connection to the server and open a new one upon making another call
+    // to the same server. This is to force authn token verification at each RPC
+    // call: the authn token is verified by the server side during connection
+    // negotiation. This test uses the in-process MiniCluster, this affects Kudu
+    // clients and the server components. In the context of this test, that's
+    // crucial only for the Kudu clients used in the tests.
+    FLAGS_rpc_reopen_outbound_connections = true;
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    MiniClusterOptions opts;
+    opts.num_tablet_servers = num_tablet_servers_;
+    cluster_.reset(new MiniCluster(env_, opts));
+    ASSERT_OK(cluster_->Start());
+  }
+
+  void TearDown() override {
+    cluster_->Shutdown();
+  }
+
+  // Generate custom TSK.
+  Status GenerateTsk(TokenSigningPrivateKeyPB* tsk, int64_t seq_num = 100) {
+    PrivateKey private_key;
+    RETURN_NOT_OK(GeneratePrivateKey(512, &private_key));
+    string private_key_str_der;
+    RETURN_NOT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+    tsk->set_rsa_key_der(private_key_str_der);
+    // Key sequence number should be high enough to be greater than sequence
+    // numbers of the TSKs generated by the master itself.
+    tsk->set_key_seq_num(seq_num);
+    tsk->set_expire_unix_epoch_seconds(WallTime_Now() + 3600);
+    return Status::OK();
+  }
+
+  // Generate authn token signed by the specified TSK. Use current client's
+  // authn token as a 'template' for the new one signed by the specified TSK.
+  Status GenerateAuthnToken(const shared_ptr<KuduClient>& client,
+                            const TokenSigningPrivateKeyPB& tsk,
+                            SignedTokenPB* new_signed_token) {
+    // Should be already connected to the cluster.
+    boost::optional<SignedTokenPB> authn_token = client->data_->messenger_->authn_token();
+    if (authn_token == boost::none) {
+      return Status::RuntimeError("client authn token is not set");
+    }
+
+    // 'Copy' the token data. The idea to is remove the signature from the signed
+    // token and sign the token with our custom TSK (see below).
+    TokenSigner* signer = cluster_->mini_master()->master()->token_signer();
+    TokenPB token;
+    const TokenVerifier& verifier = signer->verifier();
+    if (verifier.VerifyTokenSignature(*authn_token, &token) != VerificationResult::VALID) {
+      return Status::RuntimeError("current client authn token is not valid");
+    }
+
+    // Create an authn token, signing it with the custom TSK.
+    if (!token.SerializeToString(new_signed_token->mutable_token_data())) {
+      return Status::RuntimeError("failed to serialize token data");
+    }
+
+    TokenSigner forger(1, 1);
+    RETURN_NOT_OK(forger.ImportKeys({ tsk }));
+    return forger.SignToken(new_signed_token);
+  }
+
+  // Replace client's authn token with the specified one.
+  void ReplaceAuthnToken(KuduClient* client, const SignedTokenPB& token) {
+    client->data_->messenger_->set_authn_token(token);
+  }
+
+  // Import the specified TSK into the master's TokenSigner. Once the TSK is
+  // imported, the master is able to verify authn tokens signed by the TSK.
+  // The tablet servers are able to verify corresponding authn tokens after
+  // they receive TSK from the master with tserver-->master heartbeat response.
+  Status ImportTsk(const TokenSigningPrivateKeyPB& tsk) {
+    TokenSigner* signer = cluster_->mini_master()->master()->token_signer();
+    return signer->ImportKeys({ tsk });
+  }
+
+ protected:
+  const int num_tablet_servers_;
+  const int32_t heartbeat_interval_ms_;
+  const KuduSchema schema_;
+  unique_ptr<MiniCluster> cluster_;
+};
+
+
+// Tablet server sends back ERROR_UNAVAILABLE error code upon connection
+// negotiation if it does not recognize the TSK which the client's authn token
+// is signed with. The client should receive ServiceUnavailable status in that
+// case and retry the operation. This test exercises some common subset of
+// client-->master and client-->tserver RPCs. The test verifies both success
+// and failure scenarios for the selected subset of RPCs.
+TEST_F(SecurityUnknownTskTest, ErrorUnavailableCommonOperations) {
+  const string table_name = "security-unknown-tsk-itest";
+  const int64_t timeout_seconds = 1;
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(
+      &KuduClientBuilder()
+          .default_admin_operation_timeout(MonoDelta::FromSeconds(timeout_seconds))
+          .default_rpc_timeout(MonoDelta::FromSeconds(timeout_seconds)),
+      &client));
+
+  // Generate our custom TSK.
+  TokenSigningPrivateKeyPB tsk;
+  ASSERT_OK(GenerateTsk(&tsk));
+
+  // Create new authn token, signing it with the custom TSK.
+  SignedTokenPB new_signed_token;
+  ASSERT_OK(GenerateAuthnToken(client, tsk, &new_signed_token));
+
+  // Create and open a table: a proper table handle is necessary for further RPC
+  // calls to the tablet server. The table should consists of multiple tablets
+  // hosted by all available tablet servers, so the insert or scan requests are
+  // sent to all avaialble tablet servers.
+  gscoped_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(table_creator->table_name(table_name)
+      .set_range_partition_columns({ "key" })
+      .add_hash_partitions({ "key" }, num_tablet_servers_)
+      .schema(&schema_)
+      .num_replicas(1)
+      .Create());
+  ASSERT_OK(client->OpenTable(table_name, &table));
+
+  shared_ptr<KuduSession> session = client->NewSession();
+  // We want to send the write batch to the server as soon as it's applied.
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+  // Insert a row into the table -- this is to populate the client's metadata
+  // cache so the client wont try to do that later while trying to re-insert the
+  // same data. If not doing that, then the Apply() call for the duplicate
+  // insert would not be sent to the tablet server: instead, it would be sent to
+  // the master server to find about location of the target tablet. The idea is
+  // to cover client-->tserver RPCs by this test as well.
+  {
+    unique_ptr<KuduInsert> ins(table->NewInsert());
+    ASSERT_OK(ins->mutable_row()->SetInt32(0, -1));
+    ASSERT_OK(ins->mutable_row()->SetInt32(1, -1));
+    ASSERT_OK(session->Apply(ins.release()));
+  }
+
+  // Replace the original authn token with the specially crafted one. From this
+  // point until importing the custom TSK into the master's verifier, the master
+  // and tablet servers should respond with ERROR_UNAVAILABLE because the client
+  // is about to present authn token signed with unknown TSK.
+  ReplaceAuthnToken(client.get(), new_signed_token);
+
+  // Try to create the table again: this time the RPC shall not pass since the
+  // authn token has been replaced (but not because the table already exists).
+  {
+    const Status s = table_creator->table_name(table_name)
+        .set_range_partition_columns({ "key" })
+        .schema(&schema_)
+        .num_replicas(1)
+        .Create();
+    // The client automatically retries on getting ServiceUnavailable from the
+    // master. It will retry in vain until the operation times out.
+    const string err_msg = s.ToString();
+    ASSERT_TRUE(s.IsTimedOut()) << err_msg;
+    ASSERT_STR_CONTAINS(err_msg, "CreateTable timed out after deadline expired");
+  }
+
+  {
+    shared_ptr<KuduTable> table;
+    const Status s = client->OpenTable(table_name, &table);
+    const string err_msg = s.ToString();
+    ASSERT_TRUE(s.IsTimedOut()) << err_msg;
+    ASSERT_STR_CONTAINS(err_msg, "GetTableSchema timed out after deadline expired");
+  }
+
+  // Try to insert the same data which has been successfully inserted prior to
+  // replacing the authn token. The idea is to exercise client-->tablet server
+  // path: the meta-cache already contains information on the corresponding
+  // tablet server and the client will try to send RPCs to the tablet server
+  // directly, avoiding calls to the master server which would happen if the
+  // meta-cache did not contain the information on the tablet location.
+  {
+    unique_ptr<KuduInsert> ins(table->NewInsert());
+    ASSERT_OK(ins->mutable_row()->SetInt32(0, -1));
+    ASSERT_OK(ins->mutable_row()->SetInt32(1, -1));
+    const Status s_apply = session->Apply(ins.release());
+    // The error returned is a generic IOError, and the details are provided
+    // by the KuduSession::GetPendingErrors() method.
+    ASSERT_TRUE(s_apply.IsIOError()) << s_apply.ToString();
+    ASSERT_STR_CONTAINS(s_apply.ToString(), "Some errors occurred");
+
+    std::vector<KuduError*> errors;
+    ElementDeleter cleanup(&errors);
+
+    session->GetPendingErrors(&errors, nullptr);
+    ASSERT_EQ(1, errors.size());
+    ASSERT_NE(nullptr, errors[0]);
+    const Status& ps = errors[0]->status();
+    const string err_msg = ps.ToString();
+    // The client automatically retries on getting ServiceUnavailable from the
+    // tablet server. It will retry in vain until the operation times out.
+    ASSERT_TRUE(ps.IsTimedOut()) << err_msg;
+    ASSERT_STR_CONTAINS(err_msg, "Failed to write batch of 1 ops");
+  }
+
+  // Try opening a scanner. This should fail, timing out on retries.
+  {
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
+    ASSERT_OK(scanner.SetTimeoutMillis(1000));
+    const Status s = scanner.Open();
+    const string err_msg = s.ToString();
+    ASSERT_TRUE(s.IsTimedOut()) << err_msg;
+    ASSERT_STR_CONTAINS(err_msg, "GetTableLocations");
+  }
+
+  // In a separate thread, import our TSK into the master's TokenSigner. After
+  // importing, the TSK should propagate to the tablet servers and the client
+  // should be able to authenticate using its custom authn token.
+  thread importer(
+    [&]() {
+      SleepFor(MonoDelta::FromMilliseconds(timeout_seconds * 1000 / 5));
+      CHECK_OK(ImportTsk(tsk));
+    });
+
+  // An automatic clean-up to handle failure cases in the code below.
+  auto importer_cleanup = MakeScopedCleanup([&]() {
+      importer.join();
+    });
+
+  // The client should retry its operations until the masters and tablet servers
+  // get the necessary token verification key to verify our custom authn token.
+  for (int i = 0; i < num_tablet_servers_; ++i) {
+    unique_ptr<KuduInsert> ins(table->NewInsert());
+    ASSERT_OK(ins->mutable_row()->SetInt32(0, i));
+    ASSERT_OK(ins->mutable_row()->SetInt32(1, i));
+    ASSERT_OK(session->Apply(ins.release()));
+  }
+
+  // Run a scan to verify the number of inserted rows.
+  EXPECT_EQ(num_tablet_servers_ + 1, client::CountTableRows(table.get()));
+}
+
+// Replace client's authn token while running a workload which includes creating
+// a table, inserting data and reading it back. With huge number of runs,
+// this gives coverage of ERROR_UNAVAILABLE handling for all RPC calls involved
+// in the workload scenario.
+TEST_F(SecurityUnknownTskTest, ErrorUnavailableDuringWorkload) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  static const int64_t kTimeoutMs = 20 * 1000;
+  int64_t tsk_seq_num = 100;
+  // Targeting the total runtime to be less than 3 minutes, and in most cases
+  // less than 2 minutes. Sometimes a cycle might take two and in rare cases
+  // up to three timeout intervals to complete.
+  for (int i = 0; i < 3; ++i) {
+    TestWorkload w(cluster_.get());
+    w.set_num_tablets(num_tablet_servers_);
+    w.set_num_replicas(1);
+    w.set_num_read_threads(2);
+    w.set_num_write_threads(2);
+    w.set_write_batch_size(4096);
+    w.set_client_default_rpc_timeout_millis(kTimeoutMs);
+    w.set_read_timeout_millis(kTimeoutMs);
+    w.set_write_timeout_millis(kTimeoutMs);
+
+    auto client = w.CreateClient();
+    atomic<bool> importer_do_run(true);
+    thread importer(
+      [&]() {
+        // See below for the explanation.
+        MonoTime sync_point = MonoTime::Now();
+
+        while (importer_do_run) {
+          // Generate our custom TSK.
+          TokenSigningPrivateKeyPB tsk;
+
+          // The master's TokenSigner might be generating TSKs in the background
+          // according to its own schedule. The master's TokenSigner increments
+          // the sequence number by 1 for every new TSK generated. To avoid TSK
+          // sequence number collisions, it's necessary to increment the sequence
+          // number for our custom TSKs more aggressively.
+          tsk_seq_num += 10;
+          CHECK_OK(GenerateTsk(&tsk, tsk_seq_num));
+
+          // Create new authn token, signing it with the custom TSK.
+          SignedTokenPB new_signed_token;
+          CHECK_OK(GenerateAuthnToken(client, tsk, &new_signed_token));
+
+          ReplaceAuthnToken(client.get(), new_signed_token);
+          // From now till the call of ImportTsk() the system is unaware of the
+          // custom TSK key and the token signed with it cannot be verified.
+          SleepFor(MonoDelta::FromMilliseconds(rand() % 5 + 5));
+          CHECK_OK(ImportTsk(tsk));
+
+          // After the ImportTsk() call, the public part of the TSK needs to
+          // reach tablet servers so they could verify the custom authn token.
+          // The delay is more than the minimum required heartbeat_inteval_ms_
+          // to allow for completion of pending operations when retrying with
+          // the 'exponential back-off' policy. In addition, some clients might
+          // be long in the retry sequence, not being able to catch up with the
+          // rate of the token replacement: they need to wake up and make the
+          // retry call when current TSK is known to the system. Due to the
+          // exponential back-off algorithm of the retry sequence, there might
+          // be some clients sleeping for up to ~5 seconds between retries.
+          // To avoid timing out in such situations, every timeout interval
+          // a 'sync point' happens, so the long-sleeping clients are able to
+          // complete their operations. The kSyncSleepIntervalMs is little over
+          // the necessary ~5 seconds to avoid test flakiness on slow VMs.
+          static const int64_t kSyncSleepIntervalMs = 7500;
+          bool is_sync_point = (sync_point + MonoDelta::FromMilliseconds(
+              kTimeoutMs - kSyncSleepIntervalMs) <= MonoTime::Now());
+          const int64_t sleep_time_ms = is_sync_point
+              ? kSyncSleepIntervalMs : 5 * heartbeat_interval_ms_;
+          SleepFor(MonoDelta::FromMilliseconds(sleep_time_ms));
+          if (is_sync_point) {
+            sync_point = MonoTime::Now();
+          }
+        }
+      });
+
+    w.Setup();
+    w.Start();
+
+    // Let the workload run for some time.
+    const int64_t halfRun = kTimeoutMs / 2;
+    SleepFor(MonoDelta::FromMilliseconds(rand() % halfRun + halfRun));
+
+    w.StopAndJoin();
+    CHECK_OK(w.Cleanup());
+
+    importer_do_run = false;
+    importer.join();
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 5716718..5e98806 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -32,6 +32,7 @@
 
 namespace kudu {
 
+using client::KuduClient;
 using client::KuduInsert;
 using client::KuduScanBatch;
 using client::KuduScanner;
@@ -51,6 +52,9 @@ TestWorkload::TestWorkload(MiniClusterBase* cluster)
     payload_bytes_(11),
     num_write_threads_(4),
     num_read_threads_(0),
+    // Set a high scanner timeout so that we're likely to have a chance to scan, even in
+    // high-stress workloads.
+    read_timeout_millis_(60000),
     write_batch_size_(50),
     write_timeout_millis_(20000),
     timeout_allowed_(false),
@@ -186,9 +190,7 @@ void TestWorkload::ReadThread() {
     SleepFor(MonoDelta::FromMilliseconds(150));
 
     KuduScanner scanner(table.get());
-    // Set a high scanner timeout so that we're likely to have a chance to scan, even in
-    // high-stress workloads.
-    CHECK_OK(scanner.SetTimeoutMillis(60 * 1000 /* 60 seconds */));
+    CHECK_OK(scanner.SetTimeoutMillis(read_timeout_millis_));
     CHECK_OK(scanner.SetFaultTolerant());
 
     int64_t expected_row_count = rows_inserted_.Load();
@@ -205,8 +207,15 @@ void TestWorkload::ReadThread() {
   }
 }
 
-void TestWorkload::Setup() {
+shared_ptr<KuduClient> TestWorkload::CreateClient() {
   CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
+  return client_;
+}
+
+void TestWorkload::Setup() {
+  if (!client_) {
+    CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
+  }
 
   bool table_exists;
 
@@ -286,6 +295,12 @@ void TestWorkload::Start() {
   }
 }
 
+Status TestWorkload::Cleanup() {
+  // Should be run only when workload is inactive.
+  CHECK(!should_run_.Load() && threads_.empty());
+  return client_->DeleteTable(table_name_);
+}
+
 void TestWorkload::StopAndJoin() {
   should_run_.Store(false);
   start_latch_.Reset(0);

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index fdab495..dbd848e 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -70,6 +70,10 @@ class TestWorkload {
     client_builder_.default_rpc_timeout(MonoDelta::FromMilliseconds(t));
   }
 
+  void set_read_timeout_millis(int t) {
+    read_timeout_millis_ = t;
+  }
+
   void set_write_timeout_millis(int t) {
     write_timeout_millis_ = t;
   }
@@ -153,6 +157,8 @@ class TestWorkload {
     }
   }
 
+  client::sp::shared_ptr<client::KuduClient> CreateClient();
+
   // Sets up the internal client and creates the table which will be used for
   // writing, if it doesn't already exist.
   void Setup();
@@ -163,6 +169,9 @@ class TestWorkload {
   // Stop the writers and wait for them to exit.
   void StopAndJoin();
 
+  // Delete created table, etc.
+  Status Cleanup();
+
   // Return the number of rows inserted so far. This may be called either
   // during or after the write workload.
   int64_t rows_inserted() const {
@@ -178,6 +187,8 @@ class TestWorkload {
     return batches_completed_.Load();
   }
 
+  client::sp::shared_ptr<client::KuduClient> client() const { return client_; }
+
  private:
   void OpenTable(client::sp::shared_ptr<client::KuduTable>* table);
   void WriteThread();
@@ -191,6 +202,7 @@ class TestWorkload {
   int payload_bytes_;
   int num_write_threads_;
   int num_read_threads_;
+  int read_timeout_millis_;
   int write_batch_size_;
   int write_timeout_millis_;
   bool timeout_allowed_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index e083658..5d5c0f0 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -98,6 +98,8 @@ static Status StatusFromRpcError(const ErrorStatusPB& error) {
   switch (error.code()) {
     case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED:
       return Status::NotAuthorized(code_name, error.message());
+    case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE:
+      return Status::ServiceUnavailable(code_name, error.message());
     default:
       return Status::RuntimeError(code_name, error.message());
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
index d64f6be..792b0e6 100644
--- a/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -100,9 +100,9 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
           == ErrorStatusPB::ERROR_REQUEST_STALE) {
         return { RetriableRpcStatus::NON_RETRIABLE_ERROR,
               mutable_retrier()->controller().status() };
-      } else {
-        return { RetriableRpcStatus::SERVER_BUSY, mutable_retrier()->controller().status() };
       }
+      return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+               mutable_retrier()->controller().status() };
     }
 
     // If the controller is not finished we're in the ReplicaFoundCb() callback.
@@ -126,10 +126,17 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
       // the same reply back.
       int random = rand() % 4;
       switch (random) {
-        case 0: return { RetriableRpcStatus::SERVER_BUSY, Status::RemoteError("") };
-        case 1: return { RetriableRpcStatus::RESOURCE_NOT_FOUND, Status::RemoteError("") };
-        case 2: return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE, Status::RemoteError("") };
-        case 3: return { RetriableRpcStatus::OK, Status::OK() };
+        case 0:
+          return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+                   Status::RemoteError("") };
+        case 1:
+          return { RetriableRpcStatus::RESOURCE_NOT_FOUND,
+                   Status::RemoteError("") };
+        case 2:
+          return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE,
+                   Status::RemoteError("") };
+        case 3:
+          return { RetriableRpcStatus::OK, Status::OK() };
         default: LOG(FATAL) << "Unexpected value";
       }
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 87ca39a..2b1b5ad 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -160,7 +160,7 @@ class OutboundCall {
   // should be set to the error returned by the remote server. Takes
   // ownership of 'err_pb'.
   void SetFailed(const Status& status,
-                 ErrorStatusPB* err_pb = NULL);
+                 ErrorStatusPB* err_pb = nullptr);
 
   // Mark the call as timed out. This also triggers the callback to notify
   // the caller.

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index 43c3662..ca39abe 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -136,10 +136,11 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(const RetriableR
                                                                 Server* server) {
   // Handle the cases where we retry.
   switch (result.result) {
-    // For writes, always retry a TOO_BUSY error on the same server.
-    case RetriableRpcStatus::SERVER_BUSY: {
+    case RetriableRpcStatus::SERVICE_UNAVAILABLE:
+      // For writes, always retry the request on the same server in case of the
+      // SERVICE_UNAVAILABLE error.
       break;
-    }
+
     case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: {
       // TODO(KUDU-1745): not checking for null here results in a crash, since in the case
       // of a failed master lookup we have no tablet server corresponding to the error.

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc.cc b/src/kudu/rpc/rpc.cc
index 64f318d..685da13 100644
--- a/src/kudu/rpc/rpc.cc
+++ b/src/kudu/rpc/rpc.cc
@@ -36,13 +36,16 @@ bool RpcRetrier::HandleResponse(Rpc* rpc, Status* out_status) {
   DCHECK(rpc);
   DCHECK(out_status);
 
-  // Always retry a TOO_BUSY error.
-  Status controller_status = controller_.status();
+  // Always retry TOO_BUSY and UNAVAILABLE errors.
+  const Status controller_status = controller_.status();
   if (controller_status.IsRemoteError()) {
     const ErrorStatusPB* err = controller_.error_response();
     if (err &&
         err->has_code() &&
-        err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
+        (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+         err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+      // The UNAVAILABLE code is a broader counterpart of the
+      // SERVER_TOO_BUSY. In both cases it's necessary to retry a bit later.
       DelayedRetry(rpc, controller_status);
       return true;
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc.h b/src/kudu/rpc/rpc.h
index 077c8f3..a2f41a2 100644
--- a/src/kudu/rpc/rpc.h
+++ b/src/kudu/rpc/rpc.h
@@ -47,8 +47,12 @@ struct RetriableRpcStatus {
     // reaching the replica or a DNS resolution problem.
     SERVER_NOT_ACCESSIBLE,
 
-    // The server is too busy to serve the request.
-    SERVER_BUSY,
+    // The server received the request but it was not ready to serve it right
+    // away. It might happen that the server was too busy and did not have
+    // necessary resources or information to serve the request but it
+    // anticipates it should be ready to serve the request really soon, so it's
+    // worth retrying the request at a later time.
+    SERVICE_UNAVAILABLE,
 
     // For rpc's that are meant only for the leader of a shared resource, when the server
     // we're interacting with is not the leader.
@@ -56,7 +60,7 @@ struct RetriableRpcStatus {
 
     // The server doesn't know the resource we're interacting with. For instance a TabletServer
     // is not part of the config for the tablet we're trying to write to.
-    RESOURCE_NOT_FOUND
+    RESOURCE_NOT_FOUND,
   };
 
   Result result;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index 3bb4e33..ac895fc 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -101,8 +101,8 @@ class RpcContext {
 
   // Respond with an RPC-level error. This typically manifests to the client as
   // a remote error, one whose handling is agnostic to the particulars of the
-  // sent RPC. For example, ERROR_SERVER_TOO_BUSY usually causes the client to
-  // retry the RPC at a later time.
+  // sent RPC. For example, both ERROR_SERVER_TOO_BUSY and ERROR_UNAVAILABLE
+  // usually cause the client to retry the RPC at a later time.
   //
   // After this method returns, this RpcContext object is destroyed. The request
   // and response protobufs are also destroyed.