You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/04/29 02:44:18 UTC
kudu git commit: [rpc] handling ERROR_UNAVAILABLE RPC error
Repository: kudu
Updated Branches:
refs/heads/master eb79a7183 -> d4cffa503
[rpc] handling ERROR_UNAVAILABLE RPC error
This patch adds handling of the newly introduced ERROR_UNAVAILABLE RPC
error code. The ERROR_UNAVAILABLE error code is a broader counterpart
of the ERROR_SERVER_TOO_BUSY.
>From the client side, both ERROR_UNAVAILABLE and ERROR_SERVER_TOO_BUSY
codes mean it's worth retrying the call at a later time. To reflect
that, the internal codes {RetriableRpcStatus,ScanRpcStatus}::SERVER_BUSY
are replaced with more generic SERVICE_UNAVAILABLE.
Added an integration test to cover the behavior of the server components
and the Kudu C++ client when client sends authn token signed by a TSK
unknown to master and tablet servers.
Change-Id: I87d780a4ad88c15ceaacfddf6c1b69ed053bb959
Reviewed-on: http://gerrit.cloudera.org:8080/6640
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d4cffa50
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d4cffa50
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d4cffa50
Branch: refs/heads/master
Commit: d4cffa503ae8d913bf3834bcc92f75f80d1f640f
Parents: eb79a71
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Apr 14 18:21:28 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Sat Apr 29 02:43:11 2017 +0000
----------------------------------------------------------------------
src/kudu/client/batcher.cc | 21 +-
src/kudu/client/client-internal.cc | 5 +-
src/kudu/client/client.h | 2 +
src/kudu/client/scanner-internal.cc | 8 +-
src/kudu/client/scanner-internal.h | 8 +-
src/kudu/integration-tests/CMakeLists.txt | 1 +
.../security-unknown-tsk-itest.cc | 431 +++++++++++++++++++
src/kudu/integration-tests/test_workload.cc | 23 +-
src/kudu/integration-tests/test_workload.h | 12 +
src/kudu/rpc/client_negotiation.cc | 2 +
src/kudu/rpc/exactly_once_rpc-test.cc | 19 +-
src/kudu/rpc/outbound_call.h | 2 +-
src/kudu/rpc/retriable_rpc.h | 7 +-
src/kudu/rpc/rpc.cc | 9 +-
src/kudu/rpc/rpc.h | 10 +-
src/kudu/rpc/rpc_context.h | 4 +-
16 files changed, 529 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index a3d9b7a..1586cae 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -30,13 +30,13 @@
#include <glog/logging.h>
#include "kudu/client/callbacks.h"
-#include "kudu/client/client.h"
#include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
#include "kudu/client/error_collector.h"
#include "kudu/client/meta_cache.h"
#include "kudu/client/session-internal.h"
-#include "kudu/client/write_op.h"
#include "kudu/client/write_op-internal.h"
+#include "kudu/client/write_op.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
@@ -49,6 +49,7 @@
#include "kudu/rpc/request_tracker.h"
#include "kudu/rpc/retriable_rpc.h"
#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/logging.h"
@@ -337,19 +338,25 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
result.status = mutable_retrier()->controller().status();
}
+ // Check for specific RPC errors.
if (result.status.IsRemoteError()) {
const ErrorStatusPB* err = mutable_retrier()->controller().error_response();
- if (err &&
- err->has_code() &&
- err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
- result.result = RetriableRpcStatus::SERVER_BUSY;
+ if (err && err->has_code() &&
+ (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+ err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+ result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
return result;
}
}
+ if (result.status.IsServiceUnavailable()) {
+ result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+ return result;
+ }
+
// Failover to a replica in the event of any network failure or of a DNS resolution problem.
//
- // TODO: This is probably too harsh; some network failures should be
+ // TODO(adar): This is probably too harsh; some network failures should be
// retried on the current replica.
if (result.status.IsNetworkError()) {
result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 1f61e0f..3f45d02 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -186,7 +186,10 @@ Status KuduClient::Data::SyncLeaderMasterRpc(
const ErrorStatusPB* err = rpc.error_response();
if (err &&
err->has_code() &&
- err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
+ (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+ err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+ // The UNAVAILABLE error code is a broader counterpart of the
+ // SERVER_TOO_BUSY. In both cases it's necessary to retry a bit later.
continue;
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 33a95de..a52c479 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -52,6 +52,7 @@ namespace kudu {
class ClientStressTest_TestUniqueClientIds_Test;
class LinkedListTester;
class PartitionSchema;
+class SecurityUnknownTskTest;
namespace tools {
class LeaderMasterProxy;
@@ -527,6 +528,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
friend class KuduTable;
friend class KuduTableAlterer;
friend class KuduTableCreator;
+ friend class ::kudu::SecurityUnknownTskTest;
friend class tools::LeaderMasterProxy;
FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds);
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index c804046..8ca16b7 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -83,7 +83,7 @@ Status KuduScanner::Data::HandleError(const ScanRpcStatus& err,
bool can_retry = true;
bool backoff = false;
switch (err.result) {
- case ScanRpcStatus::SERVER_BUSY:
+ case ScanRpcStatus::SERVICE_UNAVAILABLE:
backoff = true;
break;
case ScanRpcStatus::RPC_DEADLINE_EXCEEDED:
@@ -173,8 +173,10 @@ ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status,
switch (controller_.error_response()->code()) {
case rpc::ErrorStatusPB::ERROR_INVALID_REQUEST:
return ScanRpcStatus{ScanRpcStatus::INVALID_REQUEST, rpc_status};
- case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY:
- return ScanRpcStatus{ScanRpcStatus::SERVER_BUSY, rpc_status};
+ case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY: // fall-through
+ case rpc::ErrorStatusPB::ERROR_UNAVAILABLE:
+ return ScanRpcStatus{
+ ScanRpcStatus::SERVICE_UNAVAILABLE, rpc_status};
default:
return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 4292f7e..154ba42 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -46,8 +46,12 @@ struct ScanRpcStatus {
// The request was malformed (e.g. bad schema, etc).
INVALID_REQUEST,
- // The server was busy (e.g. RPC queue overflow).
- SERVER_BUSY,
+ // The server received the request but it was not ready to serve it right
+ // away. It might happen that the server was too busy and did not have
+ // necessary resources or information to serve the request but it
+ // anticipates it should be ready to serve the request really soon, so it's
+ // worth retrying the request at a later time.
+ SERVICE_UNAVAILABLE,
// The deadline for the whole batch was exceeded.
OVERALL_DEADLINE_EXCEEDED,
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 2ef0fd2..d81ad37 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -79,6 +79,7 @@ ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
ADD_KUDU_TEST(security-faults-itest)
ADD_KUDU_TEST(security-itest)
+ADD_KUDU_TEST(security-unknown-tsk-itest)
ADD_KUDU_TEST(table_locations-itest)
ADD_KUDU_TEST(tablet_copy-itest)
ADD_KUDU_TEST(tablet_copy_client_session-itest)
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/security-unknown-tsk-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/security-unknown-tsk-itest.cc b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
new file mode 100644
index 0000000..e610b6c
--- /dev/null
+++ b/src/kudu/integration-tests/security-unknown-tsk-itest.cc
@@ -0,0 +1,431 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <boost/none.hpp>
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/schema.h"
+#include "kudu/client/client.h"
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/schema.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/master-test-util.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(rpc_reopen_outbound_connections);
+DECLARE_bool(rpc_trace_negotiation);
+DECLARE_int32(heartbeat_interval_ms);
+
+using std::atomic;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+namespace kudu {
+
+using client::KuduClient;
+using client::KuduClientBuilder;
+using client::KuduError;
+using client::KuduInsert;
+using client::KuduRowResult;
+using client::KuduScanner;
+using client::KuduSchema;
+using client::KuduSession;
+using client::KuduTable;
+using client::KuduTableCreator;
+using client::sp::shared_ptr;
+using security::DataFormat;
+using security::PrivateKey;
+using security::SignedTokenPB;
+using security::TokenPB;
+using security::TokenSigner;
+using security::TokenSigningPrivateKeyPB;
+using security::TokenVerifier;
+using security::VerificationResult;
+
+class SecurityUnknownTskTest : public KuduTest {
+ public:
+ SecurityUnknownTskTest()
+ : num_tablet_servers_(3),
+ heartbeat_interval_ms_(100),
+ schema_(client::KuduSchemaFromSchema(CreateKeyValueTestSchema())) {
+
+ // Make the ts->master heartbeat interval shorter to run the test faster.
+ FLAGS_heartbeat_interval_ms = heartbeat_interval_ms_;
+
+ // Within the scope of the same reactor thread, close an already established
+ // idle connection to the server and open a new one upon making another call
+ // to the same server. This is to force authn token verification at each RPC
+ // call: the authn token is verified by the server side during connection
+ // negotiation. This test uses the in-process MiniCluster, this affects Kudu
+ // clients and the server components. In the context of this test, that's
+ // crucial only for the Kudu clients used in the tests.
+ FLAGS_rpc_reopen_outbound_connections = true;
+ }
+
+ void SetUp() override {
+ KuduTest::SetUp();
+
+ MiniClusterOptions opts;
+ opts.num_tablet_servers = num_tablet_servers_;
+ cluster_.reset(new MiniCluster(env_, opts));
+ ASSERT_OK(cluster_->Start());
+ }
+
+ void TearDown() override {
+ cluster_->Shutdown();
+ }
+
+ // Generate custom TSK.
+ Status GenerateTsk(TokenSigningPrivateKeyPB* tsk, int64_t seq_num = 100) {
+ PrivateKey private_key;
+ RETURN_NOT_OK(GeneratePrivateKey(512, &private_key));
+ string private_key_str_der;
+ RETURN_NOT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+ tsk->set_rsa_key_der(private_key_str_der);
+ // Key sequence number should be high enough to be greater than sequence
+ // numbers of the TSKs generated by the master itself.
+ tsk->set_key_seq_num(seq_num);
+ tsk->set_expire_unix_epoch_seconds(WallTime_Now() + 3600);
+ return Status::OK();
+ }
+
+ // Generate authn token signed by the specified TSK. Use current client's
+ // authn token as a 'template' for the new one signed by the specified TSK.
+ Status GenerateAuthnToken(const shared_ptr<KuduClient>& client,
+ const TokenSigningPrivateKeyPB& tsk,
+ SignedTokenPB* new_signed_token) {
+ // Should be already connected to the cluster.
+ boost::optional<SignedTokenPB> authn_token = client->data_->messenger_->authn_token();
+ if (authn_token == boost::none) {
+ return Status::RuntimeError("client authn token is not set");
+ }
+
+ // 'Copy' the token data. The idea to is remove the signature from the signed
+ // token and sign the token with our custom TSK (see below).
+ TokenSigner* signer = cluster_->mini_master()->master()->token_signer();
+ TokenPB token;
+ const TokenVerifier& verifier = signer->verifier();
+ if (verifier.VerifyTokenSignature(*authn_token, &token) != VerificationResult::VALID) {
+ return Status::RuntimeError("current client authn token is not valid");
+ }
+
+ // Create an authn token, signing it with the custom TSK.
+ if (!token.SerializeToString(new_signed_token->mutable_token_data())) {
+ return Status::RuntimeError("failed to serialize token data");
+ }
+
+ TokenSigner forger(1, 1);
+ RETURN_NOT_OK(forger.ImportKeys({ tsk }));
+ return forger.SignToken(new_signed_token);
+ }
+
+ // Replace client's authn token with the specified one.
+ void ReplaceAuthnToken(KuduClient* client, const SignedTokenPB& token) {
+ client->data_->messenger_->set_authn_token(token);
+ }
+
+ // Import the specified TSK into the master's TokenSigner. Once the TSK is
+ // imported, the master is able to verify authn tokens signed by the TSK.
+ // The tablet servers are able to verify corresponding authn tokens after
+ // they receive TSK from the master with tserver-->master heartbeat response.
+ Status ImportTsk(const TokenSigningPrivateKeyPB& tsk) {
+ TokenSigner* signer = cluster_->mini_master()->master()->token_signer();
+ return signer->ImportKeys({ tsk });
+ }
+
+ protected:
+ const int num_tablet_servers_;
+ const int32_t heartbeat_interval_ms_;
+ const KuduSchema schema_;
+ unique_ptr<MiniCluster> cluster_;
+};
+
+
+// Tablet server sends back ERROR_UNAVAILABLE error code upon connection
+// negotiation if it does not recognize the TSK which the client's authn token
+// is signed with. The client should receive ServiceUnavailable status in that
+// case and retry the operation. This test exercises some common subset of
+// client-->master and client-->tserver RPCs. The test verifies both success
+// and failure scenarios for the selected subset of RPCs.
+TEST_F(SecurityUnknownTskTest, ErrorUnavailableCommonOperations) {
+ const string table_name = "security-unknown-tsk-itest";
+ const int64_t timeout_seconds = 1;
+
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(cluster_->CreateClient(
+ &KuduClientBuilder()
+ .default_admin_operation_timeout(MonoDelta::FromSeconds(timeout_seconds))
+ .default_rpc_timeout(MonoDelta::FromSeconds(timeout_seconds)),
+ &client));
+
+ // Generate our custom TSK.
+ TokenSigningPrivateKeyPB tsk;
+ ASSERT_OK(GenerateTsk(&tsk));
+
+ // Create new authn token, signing it with the custom TSK.
+ SignedTokenPB new_signed_token;
+ ASSERT_OK(GenerateAuthnToken(client, tsk, &new_signed_token));
+
+ // Create and open a table: a proper table handle is necessary for further RPC
+ // calls to the tablet server. The table should consists of multiple tablets
+ // hosted by all available tablet servers, so the insert or scan requests are
+ // sent to all avaialble tablet servers.
+ gscoped_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(table_creator->table_name(table_name)
+ .set_range_partition_columns({ "key" })
+ .add_hash_partitions({ "key" }, num_tablet_servers_)
+ .schema(&schema_)
+ .num_replicas(1)
+ .Create());
+ ASSERT_OK(client->OpenTable(table_name, &table));
+
+ shared_ptr<KuduSession> session = client->NewSession();
+ // We want to send the write batch to the server as soon as it's applied.
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+
+ // Insert a row into the table -- this is to populate the client's metadata
+ // cache so the client wont try to do that later while trying to re-insert the
+ // same data. If not doing that, then the Apply() call for the duplicate
+ // insert would not be sent to the tablet server: instead, it would be sent to
+ // the master server to find about location of the target tablet. The idea is
+ // to cover client-->tserver RPCs by this test as well.
+ {
+ unique_ptr<KuduInsert> ins(table->NewInsert());
+ ASSERT_OK(ins->mutable_row()->SetInt32(0, -1));
+ ASSERT_OK(ins->mutable_row()->SetInt32(1, -1));
+ ASSERT_OK(session->Apply(ins.release()));
+ }
+
+ // Replace the original authn token with the specially crafted one. From this
+ // point until importing the custom TSK into the master's verifier, the master
+ // and tablet servers should respond with ERROR_UNAVAILABLE because the client
+ // is about to present authn token signed with unknown TSK.
+ ReplaceAuthnToken(client.get(), new_signed_token);
+
+ // Try to create the table again: this time the RPC shall not pass since the
+ // authn token has been replaced (but not because the table already exists).
+ {
+ const Status s = table_creator->table_name(table_name)
+ .set_range_partition_columns({ "key" })
+ .schema(&schema_)
+ .num_replicas(1)
+ .Create();
+ // The client automatically retries on getting ServiceUnavailable from the
+ // master. It will retry in vain until the operation times out.
+ const string err_msg = s.ToString();
+ ASSERT_TRUE(s.IsTimedOut()) << err_msg;
+ ASSERT_STR_CONTAINS(err_msg, "CreateTable timed out after deadline expired");
+ }
+
+ {
+ shared_ptr<KuduTable> table;
+ const Status s = client->OpenTable(table_name, &table);
+ const string err_msg = s.ToString();
+ ASSERT_TRUE(s.IsTimedOut()) << err_msg;
+ ASSERT_STR_CONTAINS(err_msg, "GetTableSchema timed out after deadline expired");
+ }
+
+ // Try to insert the same data which has been successfully inserted prior to
+ // replacing the authn token. The idea is to exercise client-->tablet server
+ // path: the meta-cache already contains information on the corresponding
+ // tablet server and the client will try to send RPCs to the tablet server
+ // directly, avoiding calls to the master server which would happen if the
+ // meta-cache did not contain the information on the tablet location.
+ {
+ unique_ptr<KuduInsert> ins(table->NewInsert());
+ ASSERT_OK(ins->mutable_row()->SetInt32(0, -1));
+ ASSERT_OK(ins->mutable_row()->SetInt32(1, -1));
+ const Status s_apply = session->Apply(ins.release());
+ // The error returned is a generic IOError, and the details are provided
+ // by the KuduSession::GetPendingErrors() method.
+ ASSERT_TRUE(s_apply.IsIOError()) << s_apply.ToString();
+ ASSERT_STR_CONTAINS(s_apply.ToString(), "Some errors occurred");
+
+ std::vector<KuduError*> errors;
+ ElementDeleter cleanup(&errors);
+
+ session->GetPendingErrors(&errors, nullptr);
+ ASSERT_EQ(1, errors.size());
+ ASSERT_NE(nullptr, errors[0]);
+ const Status& ps = errors[0]->status();
+ const string err_msg = ps.ToString();
+ // The client automatically retries on getting ServiceUnavailable from the
+ // tablet server. It will retry in vain until the operation times out.
+ ASSERT_TRUE(ps.IsTimedOut()) << err_msg;
+ ASSERT_STR_CONTAINS(err_msg, "Failed to write batch of 1 ops");
+ }
+
+ // Try opening a scanner. This should fail, timing out on retries.
+ {
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
+ ASSERT_OK(scanner.SetTimeoutMillis(1000));
+ const Status s = scanner.Open();
+ const string err_msg = s.ToString();
+ ASSERT_TRUE(s.IsTimedOut()) << err_msg;
+ ASSERT_STR_CONTAINS(err_msg, "GetTableLocations");
+ }
+
+ // In a separate thread, import our TSK into the master's TokenSigner. After
+ // importing, the TSK should propagate to the tablet servers and the client
+ // should be able to authenticate using its custom authn token.
+ thread importer(
+ [&]() {
+ SleepFor(MonoDelta::FromMilliseconds(timeout_seconds * 1000 / 5));
+ CHECK_OK(ImportTsk(tsk));
+ });
+
+ // An automatic clean-up to handle failure cases in the code below.
+ auto importer_cleanup = MakeScopedCleanup([&]() {
+ importer.join();
+ });
+
+ // The client should retry its operations until the masters and tablet servers
+ // get the necessary token verification key to verify our custom authn token.
+ for (int i = 0; i < num_tablet_servers_; ++i) {
+ unique_ptr<KuduInsert> ins(table->NewInsert());
+ ASSERT_OK(ins->mutable_row()->SetInt32(0, i));
+ ASSERT_OK(ins->mutable_row()->SetInt32(1, i));
+ ASSERT_OK(session->Apply(ins.release()));
+ }
+
+ // Run a scan to verify the number of inserted rows.
+ EXPECT_EQ(num_tablet_servers_ + 1, client::CountTableRows(table.get()));
+}
+
+// Replace client's authn token while running a workload which includes creating
+// a table, inserting data and reading it back. With huge number of runs,
+// this gives coverage of ERROR_UNAVAILABLE handling for all RPC calls involved
+// in the workload scenario.
+TEST_F(SecurityUnknownTskTest, ErrorUnavailableDuringWorkload) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ static const int64_t kTimeoutMs = 20 * 1000;
+ int64_t tsk_seq_num = 100;
+ // Targeting the total runtime to be less than 3 minutes, and in most cases
+ // less than 2 minutes. Sometimes a cycle might take two and in rare cases
+ // up to three timeout intervals to complete.
+ for (int i = 0; i < 3; ++i) {
+ TestWorkload w(cluster_.get());
+ w.set_num_tablets(num_tablet_servers_);
+ w.set_num_replicas(1);
+ w.set_num_read_threads(2);
+ w.set_num_write_threads(2);
+ w.set_write_batch_size(4096);
+ w.set_client_default_rpc_timeout_millis(kTimeoutMs);
+ w.set_read_timeout_millis(kTimeoutMs);
+ w.set_write_timeout_millis(kTimeoutMs);
+
+ auto client = w.CreateClient();
+ atomic<bool> importer_do_run(true);
+ thread importer(
+ [&]() {
+ // See below for the explanation.
+ MonoTime sync_point = MonoTime::Now();
+
+ while (importer_do_run) {
+ // Generate our custom TSK.
+ TokenSigningPrivateKeyPB tsk;
+
+ // The master's TokenSigner might be generating TSKs in the background
+ // according to its own schedule. The master's TokenSigner increments
+ // the sequence number by 1 for every new TSK generated. To avoid TSK
+ // sequence number collisions, it's necessary to increment the sequence
+ // number for our custom TSKs more aggressively.
+ tsk_seq_num += 10;
+ CHECK_OK(GenerateTsk(&tsk, tsk_seq_num));
+
+ // Create new authn token, signing it with the custom TSK.
+ SignedTokenPB new_signed_token;
+ CHECK_OK(GenerateAuthnToken(client, tsk, &new_signed_token));
+
+ ReplaceAuthnToken(client.get(), new_signed_token);
+ // From now till the call of ImportTsk() the system is unaware of the
+ // custom TSK key and the token signed with it cannot be verified.
+ SleepFor(MonoDelta::FromMilliseconds(rand() % 5 + 5));
+ CHECK_OK(ImportTsk(tsk));
+
+ // After the ImportTsk() call, the public part of the TSK needs to
+ // reach tablet servers so they could verify the custom authn token.
+ // The delay is more than the minimum required heartbeat_inteval_ms_
+ // to allow for completion of pending operations when retrying with
+ // the 'exponential back-off' policy. In addition, some clients might
+ // be long in the retry sequence, not being able to catch up with the
+ // rate of the token replacement: they need to wake up and make the
+ // retry call when current TSK is known to the system. Due to the
+ // exponential back-off algorithm of the retry sequence, there might
+ // be some clients sleeping for up to ~5 seconds between retries.
+ // To avoid timing out in such situations, every timeout interval
+ // a 'sync point' happens, so the long-sleeping clients are able to
+ // complete their operations. The kSyncSleepIntervalMs is little over
+ // the necessary ~5 seconds to avoid test flakiness on slow VMs.
+ static const int64_t kSyncSleepIntervalMs = 7500;
+ bool is_sync_point = (sync_point + MonoDelta::FromMilliseconds(
+ kTimeoutMs - kSyncSleepIntervalMs) <= MonoTime::Now());
+ const int64_t sleep_time_ms = is_sync_point
+ ? kSyncSleepIntervalMs : 5 * heartbeat_interval_ms_;
+ SleepFor(MonoDelta::FromMilliseconds(sleep_time_ms));
+ if (is_sync_point) {
+ sync_point = MonoTime::Now();
+ }
+ }
+ });
+
+ w.Setup();
+ w.Start();
+
+ // Let the workload run for some time.
+ const int64_t halfRun = kTimeoutMs / 2;
+ SleepFor(MonoDelta::FromMilliseconds(rand() % halfRun + halfRun));
+
+ w.StopAndJoin();
+ CHECK_OK(w.Cleanup());
+
+ importer_do_run = false;
+ importer.join();
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 5716718..5e98806 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -32,6 +32,7 @@
namespace kudu {
+using client::KuduClient;
using client::KuduInsert;
using client::KuduScanBatch;
using client::KuduScanner;
@@ -51,6 +52,9 @@ TestWorkload::TestWorkload(MiniClusterBase* cluster)
payload_bytes_(11),
num_write_threads_(4),
num_read_threads_(0),
+ // Set a high scanner timeout so that we're likely to have a chance to scan, even in
+ // high-stress workloads.
+ read_timeout_millis_(60000),
write_batch_size_(50),
write_timeout_millis_(20000),
timeout_allowed_(false),
@@ -186,9 +190,7 @@ void TestWorkload::ReadThread() {
SleepFor(MonoDelta::FromMilliseconds(150));
KuduScanner scanner(table.get());
- // Set a high scanner timeout so that we're likely to have a chance to scan, even in
- // high-stress workloads.
- CHECK_OK(scanner.SetTimeoutMillis(60 * 1000 /* 60 seconds */));
+ CHECK_OK(scanner.SetTimeoutMillis(read_timeout_millis_));
CHECK_OK(scanner.SetFaultTolerant());
int64_t expected_row_count = rows_inserted_.Load();
@@ -205,8 +207,15 @@ void TestWorkload::ReadThread() {
}
}
-void TestWorkload::Setup() {
+shared_ptr<KuduClient> TestWorkload::CreateClient() {
CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
+ return client_;
+}
+
+void TestWorkload::Setup() {
+ if (!client_) {
+ CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
+ }
bool table_exists;
@@ -286,6 +295,12 @@ void TestWorkload::Start() {
}
}
+Status TestWorkload::Cleanup() {
+ // Should be run only when workload is inactive.
+ CHECK(!should_run_.Load() && threads_.empty());
+ return client_->DeleteTable(table_name_);
+}
+
void TestWorkload::StopAndJoin() {
should_run_.Store(false);
start_latch_.Reset(0);
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index fdab495..dbd848e 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -70,6 +70,10 @@ class TestWorkload {
client_builder_.default_rpc_timeout(MonoDelta::FromMilliseconds(t));
}
+ void set_read_timeout_millis(int t) {
+ read_timeout_millis_ = t;
+ }
+
void set_write_timeout_millis(int t) {
write_timeout_millis_ = t;
}
@@ -153,6 +157,8 @@ class TestWorkload {
}
}
+ client::sp::shared_ptr<client::KuduClient> CreateClient();
+
// Sets up the internal client and creates the table which will be used for
// writing, if it doesn't already exist.
void Setup();
@@ -163,6 +169,9 @@ class TestWorkload {
// Stop the writers and wait for them to exit.
void StopAndJoin();
+ // Delete created table, etc.
+ Status Cleanup();
+
// Return the number of rows inserted so far. This may be called either
// during or after the write workload.
int64_t rows_inserted() const {
@@ -178,6 +187,8 @@ class TestWorkload {
return batches_completed_.Load();
}
+ client::sp::shared_ptr<client::KuduClient> client() const { return client_; }
+
private:
void OpenTable(client::sp::shared_ptr<client::KuduTable>* table);
void WriteThread();
@@ -191,6 +202,7 @@ class TestWorkload {
int payload_bytes_;
int num_write_threads_;
int num_read_threads_;
+ int read_timeout_millis_;
int write_batch_size_;
int write_timeout_millis_;
bool timeout_allowed_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index e083658..5d5c0f0 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -98,6 +98,8 @@ static Status StatusFromRpcError(const ErrorStatusPB& error) {
switch (error.code()) {
case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED:
return Status::NotAuthorized(code_name, error.message());
+ case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE:
+ return Status::ServiceUnavailable(code_name, error.message());
default:
return Status::RuntimeError(code_name, error.message());
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
index d64f6be..792b0e6 100644
--- a/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -100,9 +100,9 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
== ErrorStatusPB::ERROR_REQUEST_STALE) {
return { RetriableRpcStatus::NON_RETRIABLE_ERROR,
mutable_retrier()->controller().status() };
- } else {
- return { RetriableRpcStatus::SERVER_BUSY, mutable_retrier()->controller().status() };
}
+ return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+ mutable_retrier()->controller().status() };
}
// If the controller is not finished we're in the ReplicaFoundCb() callback.
@@ -126,10 +126,17 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
// the same reply back.
int random = rand() % 4;
switch (random) {
- case 0: return { RetriableRpcStatus::SERVER_BUSY, Status::RemoteError("") };
- case 1: return { RetriableRpcStatus::RESOURCE_NOT_FOUND, Status::RemoteError("") };
- case 2: return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE, Status::RemoteError("") };
- case 3: return { RetriableRpcStatus::OK, Status::OK() };
+ case 0:
+ return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+ Status::RemoteError("") };
+ case 1:
+ return { RetriableRpcStatus::RESOURCE_NOT_FOUND,
+ Status::RemoteError("") };
+ case 2:
+ return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE,
+ Status::RemoteError("") };
+ case 3:
+ return { RetriableRpcStatus::OK, Status::OK() };
default: LOG(FATAL) << "Unexpected value";
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 87ca39a..2b1b5ad 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -160,7 +160,7 @@ class OutboundCall {
// should be set to the error returned by the remote server. Takes
// ownership of 'err_pb'.
void SetFailed(const Status& status,
- ErrorStatusPB* err_pb = NULL);
+ ErrorStatusPB* err_pb = nullptr);
// Mark the call as timed out. This also triggers the callback to notify
// the caller.
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/retriable_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/retriable_rpc.h b/src/kudu/rpc/retriable_rpc.h
index 43c3662..ca39abe 100644
--- a/src/kudu/rpc/retriable_rpc.h
+++ b/src/kudu/rpc/retriable_rpc.h
@@ -136,10 +136,11 @@ bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded(const RetriableR
Server* server) {
// Handle the cases where we retry.
switch (result.result) {
- // For writes, always retry a TOO_BUSY error on the same server.
- case RetriableRpcStatus::SERVER_BUSY: {
+ case RetriableRpcStatus::SERVICE_UNAVAILABLE:
+ // For writes, always retry the request on the same server in case of the
+ // SERVICE_UNAVAILABLE error.
break;
- }
+
case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: {
// TODO(KUDU-1745): not checking for null here results in a crash, since in the case
// of a failed master lookup we have no tablet server corresponding to the error.
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc.cc b/src/kudu/rpc/rpc.cc
index 64f318d..685da13 100644
--- a/src/kudu/rpc/rpc.cc
+++ b/src/kudu/rpc/rpc.cc
@@ -36,13 +36,16 @@ bool RpcRetrier::HandleResponse(Rpc* rpc, Status* out_status) {
DCHECK(rpc);
DCHECK(out_status);
- // Always retry a TOO_BUSY error.
- Status controller_status = controller_.status();
+ // Always retry TOO_BUSY and UNAVAILABLE errors.
+ const Status controller_status = controller_.status();
if (controller_status.IsRemoteError()) {
const ErrorStatusPB* err = controller_.error_response();
if (err &&
err->has_code() &&
- err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
+ (err->code() == ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+ err->code() == ErrorStatusPB::ERROR_UNAVAILABLE)) {
+ // The UNAVAILABLE code is a broader counterpart of the
+ // SERVER_TOO_BUSY. In both cases it's necessary to retry a bit later.
DelayedRetry(rpc, controller_status);
return true;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc.h b/src/kudu/rpc/rpc.h
index 077c8f3..a2f41a2 100644
--- a/src/kudu/rpc/rpc.h
+++ b/src/kudu/rpc/rpc.h
@@ -47,8 +47,12 @@ struct RetriableRpcStatus {
// reaching the replica or a DNS resolution problem.
SERVER_NOT_ACCESSIBLE,
- // The server is too busy to serve the request.
- SERVER_BUSY,
+ // The server received the request but it was not ready to serve it right
+ // away. It might happen that the server was too busy and did not have
+ // necessary resources or information to serve the request but it
+ // anticipates it should be ready to serve the request really soon, so it's
+ // worth retrying the request at a later time.
+ SERVICE_UNAVAILABLE,
// For rpc's that are meant only for the leader of a shared resource, when the server
// we're interacting with is not the leader.
@@ -56,7 +60,7 @@ struct RetriableRpcStatus {
// The server doesn't know the resource we're interacting with. For instance a TabletServer
// is not part of the config for the tablet we're trying to write to.
- RESOURCE_NOT_FOUND
+ RESOURCE_NOT_FOUND,
};
Result result;
http://git-wip-us.apache.org/repos/asf/kudu/blob/d4cffa50/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index 3bb4e33..ac895fc 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -101,8 +101,8 @@ class RpcContext {
// Respond with an RPC-level error. This typically manifests to the client as
// a remote error, one whose handling is agnostic to the particulars of the
- // sent RPC. For example, ERROR_SERVER_TOO_BUSY usually causes the client to
- // retry the RPC at a later time.
+ // sent RPC. For example, both ERROR_SERVER_TOO_BUSY and ERROR_UNAVAILABLE
+ // usually cause the client to retry the RPC at a later time.
//
// After this method returns, this RpcContext object is destroyed. The request
// and response protobufs are also destroyed.