You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/11/06 22:38:29 UTC
[1/3] kudu git commit: KUDU-1918 Prevent hijacking of scanner IDs
Repository: kudu
Updated Branches:
refs/heads/master 038c6bc05 -> 6ce61d6e6
KUDU-1918 Prevent hijacking of scanner IDs
This makes the scanner remember its RemoteUser, and ensures that when
continuing a scan, the new requestor matches the original requestor.
This prevents one user from somehow obtaining a scanner ID from another
and then "hijacking" the in-progress scan.
This restricts scans, checksum scans, and keep-alive requests.
Change-Id: Ic91fa0ca471bd674e35aa2f8de3806b88ad4b3b4
Reviewed-on: http://gerrit.cloudera.org:8080/6348
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e172df40
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e172df40
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e172df40
Branch: refs/heads/master
Commit: e172df405aef47b1339c9879b835baf69b539f8c
Parents: 038c6bc
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Nov 1 10:49:41 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Tue Nov 6 20:55:31 2018 +0000
----------------------------------------------------------------------
src/kudu/client/client-test.cc | 65 +++++++++++++++++
src/kudu/client/client.h | 1 +
src/kudu/client/scanner-internal.cc | 3 +
src/kudu/client/scanner-internal.h | 3 +
src/kudu/tserver/scanners-test.cc | 25 ++++---
src/kudu/tserver/scanners.cc | 36 +++++++---
src/kudu/tserver/scanners.h | 39 +++++++----
src/kudu/tserver/tablet_server-test-base.cc | 12 ++--
src/kudu/tserver/tablet_server-test-base.h | 4 ++
src/kudu/tserver/tablet_server-test.cc | 88 +++++++++++++++++++++++-
src/kudu/tserver/tablet_service.cc | 46 +++++++++----
src/kudu/tserver/tserver.proto | 3 +
src/kudu/tserver/tserver_path_handlers.cc | 3 +-
13 files changed, 272 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index efcdd68..10a4c6c 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5714,5 +5714,70 @@ TEST_F(ClientTest, TestAuthenticationCredentialsRealUser) {
KuduScanner scanner(client_table_.get());
ASSERT_OK(ScanToStrings(&scanner, &rows));
}
+
+// Test that clients that aren't authenticated as the appropriate user will be
+// unable to hijack a specific scanner ID.
+TEST_F(ClientTest, TestBlockScannerHijackingAttempts) {
+ const string kUser = "token-user";
+ const string kBadGuy = "bad-guy";
+ const string table_name = client_table_->name();
+ FLAGS_user_acl = Substitute("$0,$1", kUser, kBadGuy);
+ cluster_->ShutdownNodes(cluster::ClusterNodes::ALL);
+ ASSERT_OK(cluster_->StartSync());
+
+ // Insert some rows to the table.
+ NO_FATALS(InsertTestRows(client_table_.get(), FLAGS_test_scan_num_rows));
+
+ // First authenticate as a user and create a scanner for the existing table.
+ const auto get_table_as_user = [&] (const string& user, shared_ptr<KuduTable>* table) {
+ KuduClientBuilder client_builder;
+ string authn_creds;
+ AuthenticationCredentialsPB pb;
+ pb.set_real_user(user);
+ ASSERT_TRUE(pb.SerializeToString(&authn_creds));
+ client_builder.import_authentication_credentials(authn_creds);
+
+ // Create the client and table for the user.
+ shared_ptr<KuduClient> user_client;
+ ASSERT_OK(cluster_->CreateClient(&client_builder, &user_client));
+ ASSERT_OK(user_client->OpenTable(table_name, table));
+ };
+
+ shared_ptr<KuduTable> user_table;
+ shared_ptr<KuduTable> bad_guy_table;
+ NO_FATALS(get_table_as_user(kUser, &user_table));
+ NO_FATALS(get_table_as_user(kBadGuy, &bad_guy_table));
+
+ // Test both fault-tolerant scanners and non-fault-tolerant scanners.
+ for (bool fault_tolerance : { true, false }) {
+ // Scan the table as the user to get a scanner ID, and set up a malicious
+ // scanner that will try to hijack that scanner ID. Set an initial batch
+ // size of 0 so the calls to Open() don't buffer any rows.
+ KuduScanner user_scanner(user_table.get());
+ ASSERT_OK(user_scanner.SetBatchSizeBytes(0));
+ KuduScanner bad_guy_scanner(bad_guy_table.get());
+ ASSERT_OK(bad_guy_scanner.SetBatchSizeBytes(0));
+ if (fault_tolerance) {
+ ASSERT_OK(user_scanner.SetFaultTolerant());
+ ASSERT_OK(bad_guy_scanner.SetFaultTolerant());
+ }
+ ASSERT_OK(user_scanner.Open());
+ ASSERT_OK(bad_guy_scanner.Open());
+ const string scanner_id = user_scanner.data_->next_req_.scanner_id();
+ ASSERT_FALSE(scanner_id.empty());
+
+ // Now attempt to get that scanner id as a different user.
+ LOG(INFO) << Substitute("Attempting to extract data from $0 scan $1 as $2",
+ fault_tolerance ? "fault-tolerant" : "non-fault-tolerant", scanner_id, kBadGuy);
+ bad_guy_scanner.data_->next_req_.set_scanner_id(scanner_id);
+ bad_guy_scanner.data_->last_response_.set_has_more_results(true);
+ KuduScanBatch batch;
+ Status s = bad_guy_scanner.NextBatch(&batch);
+ ASSERT_TRUE(s.IsRemoteError());
+ ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+ ASSERT_EQ(0, batch.NumRows());
+ }
+}
+
} // namespace client
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 56c3f50..626df7d 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2224,6 +2224,7 @@ class KUDU_EXPORT KuduScanner {
class KUDU_NO_EXPORT Data;
friend class KuduScanToken;
+ FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts);
FRIEND_TEST(ClientTest, TestScanCloseProxy);
FRIEND_TEST(ClientTest, TestScanFaultTolerance);
FRIEND_TEST(ClientTest, TestScanNoBlockCaching);
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 9ac1a86..d7c30bb 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -243,6 +243,9 @@ ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status,
case rpc::ErrorStatusPB::ERROR_UNAVAILABLE:
return ScanRpcStatus{
ScanRpcStatus::SERVICE_UNAVAILABLE, rpc_status};
+ case rpc::ErrorStatusPB::FATAL_UNAUTHORIZED:
+ return ScanRpcStatus{
+ ScanRpcStatus::SCAN_NOT_AUTHORIZED, rpc_status};
default:
return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 0dddb40..5e7652b 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -89,6 +89,9 @@ struct ScanRpcStatus {
// the token has expired.
RPC_INVALID_AUTHENTICATION_TOKEN,
+ // The requestor was not authorized to make the request.
+ SCAN_NOT_AUTHORIZED,
+
// Another RPC-system error (e.g. NetworkError because the TS was down).
RPC_ERROR,
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners-test.cc b/src/kudu/tserver/scanners-test.cc
index 7a2084e..e2456c4 100644
--- a/src/kudu/tserver/scanners-test.cc
+++ b/src/kudu/tserver/scanners-test.cc
@@ -24,42 +24,51 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_user.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/scanner_metrics.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
DECLARE_int32(scanner_ttl_ms);
namespace kudu {
+using rpc::RemoteUser;
+using std::vector;
using tablet::TabletReplica;
namespace tserver {
-using std::vector;
+static const char* kUsername = "kudu-user";
TEST(ScannersTest, TestManager) {
scoped_refptr<TabletReplica> null_replica(nullptr);
ScannerManager mgr(nullptr);
// Create two scanners, make sure their ids are different.
+ RemoteUser user;
+ user.SetUnauthenticated(kUsername);
SharedScanner s1, s2;
- mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1);
- mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2);
+ mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s1);
+ mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s2);
ASSERT_NE(s1->id(), s2->id());
// Check that they're both registered.
SharedScanner result;
- ASSERT_TRUE(mgr.LookupScanner(s1->id(), &result));
+ TabletServerErrorPB::Code error_code;
+ ASSERT_OK(mgr.LookupScanner(s1->id(), kUsername, &error_code, &result));
ASSERT_EQ(result.get(), s1.get());
- ASSERT_TRUE(mgr.LookupScanner(s2->id(), &result));
+ ASSERT_OK(mgr.LookupScanner(s2->id(), kUsername, &error_code, &result));
ASSERT_EQ(result.get(), s2.get());
// Check that looking up a bad scanner returns false.
- ASSERT_FALSE(mgr.LookupScanner("xxx", &result));
+ ASSERT_TRUE(mgr.LookupScanner("xxx", kUsername, &error_code, &result).IsNotFound());
+ ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code);
// Remove the scanners.
ASSERT_TRUE(mgr.UnregisterScanner(s1->id()));
@@ -75,8 +84,8 @@ TEST(ScannerTest, TestExpire) {
MetricRegistry registry;
ScannerManager mgr(METRIC_ENTITY_server.Instantiate(®istry, "test"));
SharedScanner s1, s2;
- mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1);
- mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2);
+ mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s1);
+ mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s2);
SleepFor(MonoDelta::FromMilliseconds(200));
s2->UpdateAccessTime();
mgr.RemoveExpiredScanners();
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 6fb9d77..60f69fb 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -34,6 +34,7 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_metrics.h"
@@ -68,6 +69,7 @@ using strings::Substitute;
namespace kudu {
+using rpc::RemoteUser;
using tablet::TabletReplica;
namespace tserver {
@@ -131,20 +133,16 @@ ScannerManager::ScannerMapStripe& ScannerManager::GetStripeByScannerId(const str
}
void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_replica,
- const std::string& requestor_string,
+ const RemoteUser& remote_user,
uint64_t row_format_flags,
SharedScanner* scanner) {
// Keep trying to generate a unique ID until we get one.
bool success = false;
while (!success) {
- // TODO(KUDU-1918): are these UUIDs predictable? If so, we should
- // probably generate random numbers instead, since we can safely
- // just retry until we avoid a collision. Alternatively we could
- // verify that the requestor userid does not change mid-scan.
string id = oid_generator_.Next();
scanner->reset(new Scanner(id,
tablet_replica,
- requestor_string,
+ remote_user,
metrics_.get(),
row_format_flags));
@@ -154,10 +152,26 @@ void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_repli
}
}
-bool ScannerManager::LookupScanner(const string& scanner_id, SharedScanner* scanner) {
+Status ScannerManager::LookupScanner(const string& scanner_id,
+ const string& username,
+ TabletServerErrorPB::Code* error_code,
+ SharedScanner* scanner) {
+ SharedScanner ret;
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
shared_lock<RWMutex> l(stripe.lock_);
- return FindCopy(stripe.scanners_by_id_, scanner_id, scanner);
+ bool found_scanner = FindCopy(stripe.scanners_by_id_, scanner_id, &ret);
+ if (!found_scanner) {
+ *error_code = TabletServerErrorPB::SCANNER_EXPIRED;
+ return Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
+ scanner_id));
+ }
+ if (username != ret->remote_user().username()) {
+ *error_code = TabletServerErrorPB::NOT_AUTHORIZED;
+ return Status::NotAuthorized(Substitute("User $0 doesn't own scanner $1",
+ username, scanner_id));
+ }
+ *scanner = std::move(ret);
+ return Status::OK();
}
bool ScannerManager::UnregisterScanner(const string& scanner_id) {
@@ -290,11 +304,11 @@ void ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) {
const std::string Scanner::kNullTabletId = "null tablet";
Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
- string requestor_string, ScannerMetrics* metrics,
+ RemoteUser remote_user, ScannerMetrics* metrics,
uint64_t row_format_flags)
: id_(std::move(id)),
tablet_replica_(tablet_replica),
- requestor_string_(std::move(requestor_string)),
+ remote_user_(std::move(remote_user)),
call_seq_id_(0),
start_time_(MonoTime::Now()),
metrics_(metrics),
@@ -353,7 +367,7 @@ ScanDescriptor Scanner::descriptor() const {
ScanDescriptor descriptor;
descriptor.tablet_id = tablet_id();
descriptor.scanner_id = id();
- descriptor.requestor = requestor_string();
+ descriptor.remote_user = remote_user();
descriptor.start_time = start_time_;
for (const auto& column : client_projection_schema()->columns()) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h
index 9588053..354ec90 100644
--- a/src/kudu/tserver/scanners.h
+++ b/src/kudu/tserver/scanners.h
@@ -14,8 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_TSERVER_SCANNERS_H
-#define KUDU_TSERVER_SCANNERS_H
+#pragma once
#include <cstddef>
#include <cstdint>
@@ -34,7 +33,9 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_user.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/auto_release_pool.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/locks.h"
@@ -55,9 +56,11 @@ class Thread;
namespace tserver {
class Scanner;
+
enum class ScanState;
struct ScanDescriptor;
struct ScannerMetrics;
+
typedef std::shared_ptr<Scanner> SharedScanner;
// Manages the live scanners within a Tablet Server.
@@ -76,15 +79,22 @@ class ScannerManager {
// Starts the expired scanner removal thread.
Status StartRemovalThread();
- // Create a new scanner with a unique ID, inserting it into the map.
+ // Create a new scanner with a unique ID, inserting it into the map. Further
+ // lookups for the scanner must provide the username associated with
+ // 'remote_user'.
void NewScanner(const scoped_refptr<tablet::TabletReplica>& tablet_replica,
- const std::string& requestor_string,
+ const rpc::RemoteUser& remote_user,
uint64_t row_format_flags,
SharedScanner* scanner);
- // Lookup the given scanner by its ID.
- // Returns true if the scanner is found successfully.
- bool LookupScanner(const std::string& scanner_id, SharedScanner* scanner);
+ // Lookup the given scanner by its ID with the provided username, setting an
+ // appropriate error code.
+ // Returns NotFound if the scanner doesn't exist, or NotAuthorized if the
+ // scanner wasn't created by 'username'.
+ Status LookupScanner(const std::string& scanner_id,
+ const std::string& username,
+ TabletServerErrorPB::Code* error_code,
+ SharedScanner* scanner);
// Unregister the given scanner by its ID.
// Returns true if unregistered successfully.
@@ -185,7 +195,7 @@ class Scanner {
public:
Scanner(std::string id,
const scoped_refptr<tablet::TabletReplica>& tablet_replica,
- std::string requestor_string, ScannerMetrics* metrics,
+ rpc::RemoteUser remote_user, ScannerMetrics* metrics,
uint64_t row_format_flags);
~Scanner();
@@ -238,7 +248,7 @@ class Scanner {
const scoped_refptr<tablet::TabletReplica>& tablet_replica() const { return tablet_replica_; }
- const std::string& requestor_string() const { return requestor_string_; }
+ const rpc::RemoteUser& remote_user() const { return remote_user_; }
// Returns the current call sequence ID of the scanner.
uint32_t call_seq_id() const {
@@ -322,9 +332,9 @@ class Scanner {
// Tablet associated with the scanner.
const scoped_refptr<tablet::TabletReplica> tablet_replica_;
- // Information about the requestor. Populated from
- // RpcContext::requestor_string().
- const std::string requestor_string_;
+ // The remote user making the request. Populated from the RemoteUser of the
+ // first request.
+ const rpc::RemoteUser remote_user_;
// The last time that the scanner was accessed.
MonoTime last_access_time_;
@@ -392,8 +402,8 @@ struct ScanDescriptor {
// The scanner ID.
std::string scanner_id;
- // The scan requestor.
- std::string requestor;
+ // The user that made the first request.
+ rpc::RemoteUser remote_user;
// The table name.
std::string table_name;
@@ -415,4 +425,3 @@ struct ScanDescriptor {
} // namespace tserver
} // namespace kudu
-#endif
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index 924bf5e..59b26e9 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -458,6 +458,12 @@ void TabletServerTestBase::VerifyScanRequestFailure(
}
}
+Status TabletServerTestBase::FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const {
+ scan->set_tablet_id(kTabletId);
+ scan->set_read_mode(read_mode);
+ return SchemaToColumnPBs(schema_, scan->mutable_projected_columns());
+}
+
// Open a new scanner which scans all of the columns in the table.
void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
ReadMode read_mode) {
@@ -465,11 +471,7 @@ void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
RpcController rpc;
// Set up a new request with no predicates, all columns.
- const Schema& projection = schema_;
- NewScanRequestPB* scan = req.mutable_new_scan_request();
- scan->set_tablet_id(kTabletId);
- scan->set_read_mode(read_mode);
- ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
+ ASSERT_OK(FillNewScanRequest(read_mode, req.mutable_new_scan_request()));
req.set_call_seq_id(0);
req.set_batch_size_bytes(0); // so it won't return data right away
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 9841021..b80dfac 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -120,6 +120,10 @@ class TabletServerTestBase : public KuduTest {
void OpenScannerWithAllColumns(ScanResponsePB* resp,
ReadMode read_mode = READ_LATEST);
+ // Fills out a new scan request on all of the columns in the table with the
+ // given read mode.
+ Status FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const;
+
protected:
static const char* kTableId;
static const char* kTabletId;
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 3a734ea..a12e99b 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -70,6 +70,7 @@
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/user_credentials.h"
#include "kudu/server/rpc_server.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
@@ -1529,7 +1530,9 @@ TEST_F(TabletServerTest, TestReadLatest) {
ASSERT_TRUE(!scanner_id.empty());
{
SharedScanner junk;
- ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk));
+ TabletServerErrorPB::Code error_code;
+ ASSERT_OK(mini_server_->server()->scanner_manager()->LookupScanner(
+ scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk));
}
// Ensure that the scanner shows up in the server and tablet's metrics.
@@ -1552,7 +1555,10 @@ TEST_F(TabletServerTest, TestReadLatest) {
// from the scanner manager.
{
SharedScanner junk;
- ASSERT_FALSE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk));
+ TabletServerErrorPB::Code error_code;
+ ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(
+ scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk).IsNotFound());
+ ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code);
}
// Ensure that the metrics have been updated now that the scanner is unregistered.
@@ -3397,5 +3403,83 @@ TEST_F(TabletServerTest, TestKeysInRowsetMetadataPreventStartupSeeks) {
restart_server_and_check_bytes_read(/*keys_in_rowset_meta=*/ true);
}
+// Test that each scanner can only be accessed by the user who created it.
+TEST_F(TabletServerTest, TestScannerCheckMatchingUser) {
+ rpc::UserCredentials user;
+ user.set_real_user("good-guy");
+ proxy_->set_user_credentials(user);
+
+ InsertTestRowsDirect(0, 100);
+ ScanResponsePB resp;
+ NO_FATALS(OpenScannerWithAllColumns(&resp));
+ const string& scanner_id = resp.scanner_id();
+ ASSERT_TRUE(!scanner_id.empty());
+
+ // Now do a checksum scan as the user.
+ string checksum_scanner_id;
+ int64_t checksum_val;
+ {
+ ChecksumRequestPB checksum_req;
+ ChecksumResponsePB checksum_resp;
+ RpcController rpc;
+ ASSERT_OK(FillNewScanRequest(READ_LATEST, checksum_req.mutable_new_request()));
+ // Set a batch size of 0 so we don't return rows and can expect the scanner
+ // to remain alive.
+ checksum_req.set_batch_size_bytes(0);
+ ASSERT_OK(proxy_->Checksum(checksum_req, &checksum_resp, &rpc));
+ SCOPED_TRACE(checksum_resp.DebugString());
+ ASSERT_FALSE(checksum_resp.has_error());
+ ASSERT_TRUE(checksum_resp.has_more_results());
+ checksum_scanner_id = checksum_resp.scanner_id();
+ checksum_val = checksum_resp.checksum();
+ }
+
+ constexpr auto verify_authz_error = [] (const Status& s) {
+ EXPECT_TRUE(s.IsRemoteError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+ };
+
+ for (const string& other : { "", "bad-guy" }) {
+ TabletServerServiceProxy bad_proxy(
+ client_messenger_, mini_server_->bound_rpc_addr(),
+ mini_server_->bound_rpc_addr().host());
+ if (!other.empty()) {
+ rpc::UserCredentials other_user;
+ other_user.set_real_user(other);
+ bad_proxy.set_user_credentials(other_user);
+ }
+ // Other users and clients with no credentials will be bounced for scans,
+ // checksum scans, and keep-alive requests.
+ {
+ ScanRequestPB req;
+ RpcController rpc;
+ req.set_scanner_id(scanner_id);
+ Status s = bad_proxy.Scan(req, &resp, &rpc);
+ SCOPED_TRACE(resp.DebugString());
+ NO_FATALS(verify_authz_error(s));
+ }
+ {
+ ChecksumRequestPB req;
+ ContinueChecksumRequestPB* continue_req = req.mutable_continue_request();
+ continue_req->set_scanner_id(checksum_scanner_id);
+ continue_req->set_previous_checksum(checksum_val);
+ ChecksumResponsePB resp;
+ RpcController rpc;
+ Status s = bad_proxy.Checksum(req, &resp, &rpc);
+ SCOPED_TRACE(resp.DebugString());
+ NO_FATALS(verify_authz_error(s));
+ }
+ for (const string& id : { scanner_id, checksum_scanner_id }) {
+ ScannerKeepAliveRequestPB req;
+ req.set_scanner_id(id);
+ ScannerKeepAliveResponsePB resp;
+ RpcController rpc;
+ Status s = bad_proxy.ScannerKeepAlive(req, &resp, &rpc);
+ SCOPED_TRACE(resp.DebugString());
+ NO_FATALS(verify_authz_error(s));
+ }
+ }
+}
+
} // namespace tserver
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index da4002c..9b3177e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -61,6 +61,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
@@ -367,6 +368,12 @@ static void SetupErrorAndRespond(TabletServerErrorPB* error,
const Status& s,
TabletServerErrorPB::Code code,
rpc::RpcContext* context) {
+ // Non-authorized errors will drop the connection.
+ if (code == TabletServerErrorPB::NOT_AUTHORIZED) {
+ DCHECK(s.IsNotAuthorized());
+ context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED, s);
+ return;
+ }
// Generic "service unavailable" errors will cause the client to retry later.
if ((code == TabletServerErrorPB::UNKNOWN_ERROR ||
code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) {
@@ -1287,14 +1294,23 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
rpc::RpcContext *context) {
DCHECK(req->has_scanner_id());
SharedScanner scanner;
- if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
- resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED);
- Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
- req->scanner_id()));
+ TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
+ Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
+ context->remote_user().username(),
+ &error_code,
+ &scanner);
+ if (!s.ok()) {
StatusToPB(s, resp->mutable_error()->mutable_status());
LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1",
s.ToString(), context->requestor_string());
- context->RespondSuccess();
+ if (PREDICT_TRUE(s.IsNotFound())) {
+ resp->mutable_error()->set_code(error_code);
+ StatusToPB(s, resp->mutable_error()->mutable_status());
+ context->RespondSuccess();
+ return;
+ }
+ DCHECK(s.IsNotAuthorized());
+ SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
scanner->UpdateAccessTime();
@@ -1545,7 +1561,7 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
ScanResultChecksummer collector;
bool has_more = false;
- TabletServerErrorPB::Code error_code;
+ TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
if (req->has_new_request()) {
scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
const NewScanRequestPB& new_req = req->new_request();
@@ -1814,7 +1830,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
SharedScanner scanner;
server_->scanner_manager()->NewScanner(replica,
- rpc_context->requestor_string(),
+ rpc_context->remote_user(),
scan_pb.row_format_flags(),
&scanner);
TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
@@ -2043,17 +2059,19 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
// in case multiple RPCs hit the same scanner at the same time. Probably
// just a trylock and fail the RPC if it contends.
SharedScanner scanner;
- if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
- if (batch_size_bytes == 0 && req->close_scanner()) {
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
+ rpc_context->remote_user().username(),
+ &code,
+ &scanner);
+ if (!s.ok()) {
+ if (s.IsNotFound() && batch_size_bytes == 0 && req->close_scanner()) {
// Silently ignore any request to close a non-existent scanner.
return Status::OK();
}
-
- *error_code = TabletServerErrorPB::SCANNER_EXPIRED;
- Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
- req->scanner_id()));
LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2",
s.ToString(), req->call_seq_id(), rpc_context->requestor_string());
+ *error_code = code;
return s;
}
@@ -2146,7 +2164,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
scoped_refptr<TabletReplica> replica = scanner->tablet_replica();
shared_ptr<Tablet> tablet;
TabletServerErrorPB::Code tablet_ref_error_code;
- const Status s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
+ s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
// If the tablet is not running, but the scan operation in progress
// has reached this point, the tablet server has the necessary data to
// send in response for the scan continuation request.
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index f9f8cef..afe1b71 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -96,6 +96,9 @@ message TabletServerErrorPB {
// The tablet needs to be evicted and reassigned.
TABLET_FAILED = 20;
+
+ // The request is disallowed for the given user.
+ NOT_AUTHORIZED = 21;
}
// The error code.
http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver_path_handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index 8fa0196..52ec87f 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -48,6 +48,7 @@
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
#include "kudu/server/webui_util.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.h"
@@ -536,7 +537,7 @@ void ScanToJson(const ScanDescriptor& scan, EasyJson* json) {
json->Set("scanner_id", scan.scanner_id);
json->Set("state", ScanStateToString(scan.state));
json->Set("query", ScanQueryHtml(scan));
- json->Set("requestor", scan.requestor);
+ json->Set("requestor", scan.remote_user.username());
json->Set("duration", HumanReadableElapsedTime::ToShortString(duration.ToSeconds()));
json->Set("time_since_start",
[2/3] kudu git commit: [rebalancer] location-aware rebalancer (part
9/n)
Posted by al...@apache.org.
[rebalancer] location-aware rebalancer (part 9/n)
Updated reporting functionality of the rebalancer tool to output
information on placement policy violations and other relevant
information for location-aware clusters.
Added one simple integration test as well.
Change-Id: I8407e9f8cf6b41a6aeb075372d852125d9739e08
Reviewed-on: http://gerrit.cloudera.org:8080/11862
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8e9345a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8e9345a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8e9345a7
Branch: refs/heads/master
Commit: 8e9345a79849ed3e96a85dc5240e0a4e709b2055
Parents: e172df4
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Oct 26 18:25:24 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Nov 6 21:59:21 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/placement_policy_util-test.cc | 38 +--
src/kudu/tools/placement_policy_util.cc | 2 +-
src/kudu/tools/placement_policy_util.h | 1 -
src/kudu/tools/rebalancer.cc | 332 +++++++++++++++-------
src/kudu/tools/rebalancer.h | 15 +
src/kudu/tools/rebalancer_tool-test.cc | 208 ++++++++++++--
6 files changed, 456 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/tools/placement_policy_util-test.cc
index 7d48cbe..cccd611 100644
--- a/src/kudu/tools/placement_policy_util-test.cc
+++ b/src/kudu/tools/placement_policy_util-test.cc
@@ -139,19 +139,19 @@ void ClusterConfigToClusterPlacementInfo(const TestClusterConfig& tcc,
*tpi = std::move(result_tpi);
}
-// TODO(aserbin): is it needed at all?
bool operator==(const PlacementPolicyViolationInfo& lhs,
const PlacementPolicyViolationInfo& rhs) {
return lhs.tablet_id == rhs.tablet_id &&
lhs.majority_location == rhs.majority_location &&
lhs.replicas_num_at_majority_location ==
- rhs.replicas_num_at_majority_location &&
- lhs.replication_factor == rhs.replication_factor;
+ rhs.replicas_num_at_majority_location;
}
ostream& operator<<(ostream& s, const PlacementPolicyViolationInfo& info) {
s << "{tablet_id: " << info.tablet_id
- << ", location: " << info.majority_location << "}";
+ << ", location: " << info.majority_location
+ << ", replicas_num_at_majority_location: "
+ << info.replicas_num_at_majority_location << "}";
return s;
}
@@ -327,7 +327,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
{ "D", {} },
{ "E", {} },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{ { "t0", "C" }, }
},
@@ -345,7 +345,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
{ "B", { "t0", } },
{ "C", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 2 }, },
{},
},
@@ -364,7 +364,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
{ "C", { "t0", } },
{ "D", {} },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 2 }, },
{ { "t0", "B" }, }
},
};
@@ -390,7 +390,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
{ "D", { "t1", "x1", } }, { "E", { "t1", } },
{ "F", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
{ { "t0", "B" }, { "t1", "E" } }
},
@@ -410,7 +410,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
{ "D", { "t1", "t2", } }, { "E", { "t1", "t3", } },
{ "F", { "t1", "t2", "t3", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
{ { "t0", "B" }, { "t1", "E" } }
},
};
@@ -441,7 +441,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
{ "E", { "t0", } },
{ "F", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{},
},
// One RF=7 tablet with the distribution of its replica placement violating
@@ -467,7 +467,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
{ "G", { "t0", } },
{ "H", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 4 }, },
{},
},
{
@@ -485,7 +485,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
{ "D", { "t0", } }, { "E", { "t0", } },
{ "F", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{}
},
};
@@ -525,7 +525,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "C", { "t1", } },
{ "D", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L0" }, },
+ { { "t0", "L0", 2 }, { "t1", "L0", 4 }, },
{}
},
{
@@ -541,7 +541,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "A", { "t0", } }, { "B", { "t0", } },
{ "D", { "t1", } }, { "E", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
{}
},
{
@@ -558,7 +558,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "A", { "t0", "t1", } }, { "B", { "t0", "t1", } },
{ "D", { "t1", } }, { "E", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
{}
},
{
@@ -574,7 +574,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "A", { "t0", } }, { "B", { "t0", } },
{ "C", { "t1", } }, { "D", { "t1", } },
},
- { { "t0", "L0" }, { "t1", "L1" }, },
+ { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
{}
},
{
@@ -592,7 +592,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
{ "D", { "t0", } },
{ "F", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 2 }, },
{}
},
};
@@ -616,7 +616,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
{ "D", { "t0", } }, { "F", { "t0", } },
{ "H", { "t0", } },
},
- { { "t0", "L0" }, },
+ { { "t0", "L0", 3 }, },
{ { "t0", "B" }, }
},
{
@@ -635,7 +635,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
{ "G", { "t0", } },
{ "H", { "t0", } },
},
- { { "t0", "L1" }, },
+ { { "t0", "L1", 4 }, },
{ { "t0", "D" }, }
},
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc
index be1b502..f5ab790 100644
--- a/src/kudu/tools/placement_policy_util.cc
+++ b/src/kudu/tools/placement_policy_util.cc
@@ -333,7 +333,7 @@ Status DetectPlacementPolicyViolations(
tablet_id, max_replicas_num, rep_factor, max_replicas_location);
}
if (is_policy_violated) {
- info.push_back({ tablet_id, max_replicas_location });
+ info.push_back({ tablet_id, max_replicas_location, max_replicas_num });
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/tools/placement_policy_util.h
index e54848d..2938d17 100644
--- a/src/kudu/tools/placement_policy_util.h
+++ b/src/kudu/tools/placement_policy_util.h
@@ -86,7 +86,6 @@ Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
struct PlacementPolicyViolationInfo {
std::string tablet_id;
std::string majority_location;
- int replication_factor;
int replicas_num_at_majority_location;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index 4d2d769..46f21f6 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -57,6 +57,7 @@ using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using std::accumulate;
using std::endl;
+using std::back_inserter;
using std::inserter;
using std::ostream;
using std::map;
@@ -69,6 +70,7 @@ using std::shared_ptr;
using std::sort;
using std::string;
using std::to_string;
+using std::transform;
using std::unordered_map;
using std::unordered_set;
using std::vector;
@@ -105,7 +107,7 @@ Rebalancer::Rebalancer(const Config& config)
: config_(config) {
}
-Status Rebalancer::PrintStats(std::ostream& out) {
+Status Rebalancer::PrintStats(ostream& out) {
// First, report on the current balance state of the cluster.
RETURN_NOT_OK(RefreshKsckResults());
const KsckResults& results = ksck_->results();
@@ -116,103 +118,49 @@ Status Rebalancer::PrintStats(std::ostream& out) {
ClusterInfo ci;
RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
- // Per-server replica distribution stats.
- {
- out << "Per-server replica distribution summary:" << endl;
- DataTable summary({"Statistic", "Value"});
-
- const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
- if (servers_load_info.empty()) {
- summary.AddRow({ "N/A", "N/A" });
- } else {
- const int64_t total_replica_count = accumulate(
- servers_load_info.begin(), servers_load_info.end(), 0L,
- [](int64_t sum, const pair<int32_t, string>& elem) {
- return sum + elem.first;
- });
-
- const auto min_replica_count = servers_load_info.begin()->first;
- const auto max_replica_count = servers_load_info.rbegin()->first;
- const double avg_replica_count =
- 1.0 * total_replica_count / servers_load_info.size();
-
- summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
- summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
- summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
- }
- RETURN_NOT_OK(summary.PrintTo(out));
- out << endl;
-
- if (config_.output_replica_distribution_details) {
- const auto& tserver_summaries = results.tserver_summaries;
- unordered_map<string, string> tserver_endpoints;
- for (const auto& summary : tserver_summaries) {
- tserver_endpoints.emplace(summary.uuid, summary.address);
- }
-
- out << "Per-server replica distribution details:" << endl;
- DataTable servers_info({ "UUID", "Address", "Replica Count" });
- for (const auto& elem : servers_load_info) {
- const auto& id = elem.second;
- servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
- }
- RETURN_NOT_OK(servers_info.PrintTo(out));
- out << endl;
- }
+ const auto& ts_id_by_location = ci.locality.servers_by_location;
+ if (ts_id_by_location.empty()) {
+ // Nothing to report about: there are no tablet servers reported.
+ out << "an empty cluster" << endl;
+ return Status::OK();
}
- // Per-table replica distribution stats.
- {
- out << "Per-table replica distribution summary:" << endl;
- DataTable summary({ "Replica Skew", "Value" });
- const auto& table_skew_info = ci.balance.table_info_by_skew;
- if (table_skew_info.empty()) {
- summary.AddRow({ "N/A", "N/A" });
- } else {
- const auto min_table_skew = table_skew_info.begin()->first;
- const auto max_table_skew = table_skew_info.rbegin()->first;
- const int64_t sum_table_skew = accumulate(
- table_skew_info.begin(), table_skew_info.end(), 0L,
- [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
- return sum + elem.first;
- });
- double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
-
- summary.AddRow({ "Minimum", to_string(min_table_skew) });
- summary.AddRow({ "Maximum", to_string(max_table_skew) });
- summary.AddRow({ "Average", to_string(avg_table_skew) });
- }
- RETURN_NOT_OK(summary.PrintTo(out));
- out << endl;
+ if (ts_id_by_location.size() == 1) {
+ // That's about printing information about the whole cluster.
+ return PrintLocationBalanceStats(ts_id_by_location.begin()->first,
+ raw_info, ci, out);
+ }
- if (config_.output_replica_distribution_details) {
- const auto& table_summaries = results.table_summaries;
- unordered_map<string, const KsckTableSummary*> table_info;
- for (const auto& summary : table_summaries) {
- table_info.emplace(summary.id, &summary);
- }
- out << "Per-table replica distribution details:" << endl;
- DataTable skew(
- { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
- for (const auto& elem : table_skew_info) {
- const auto& table_id = elem.second.table_id;
- const auto it = table_info.find(table_id);
- const auto* table_summary =
- (it == table_info.end()) ? nullptr : it->second;
- const auto& table_name = table_summary ? table_summary->name : "";
- const auto total_replica_count = table_summary
- ? table_summary->replication_factor * table_summary->TotalTablets()
- : 0;
- skew.AddRow({ table_id,
- to_string(total_replica_count),
- to_string(elem.first),
- table_name });
- }
- RETURN_NOT_OK(skew.PrintTo(out));
- out << endl;
- }
+ // The stats are more detailed in the case of a multi-location cluster.
+ DCHECK_GT(ts_id_by_location.size(), 1);
+
+ // 1. Print information about cross-location balance.
+ RETURN_NOT_OK(PrintCrossLocationBalanceStats(ci, out));
+
+ // 2. Iterating over locations in the cluster, print per-location balance
+ // information. Since the ts_id_by_location is not sorted, let's first
+ // create a sorted list of locations so the ouput would be sorted by
+ // location.
+ vector<string> locations;
+ locations.reserve(ts_id_by_location.size());
+ transform(ts_id_by_location.cbegin(), ts_id_by_location.cend(),
+ back_inserter(locations),
+ [](const unordered_map<string, set<string>>::value_type& elem) {
+ return elem.first;
+ });
+ sort(locations.begin(), locations.end());
+
+ for (const auto& location : locations) {
+ ClusterRawInfo raw_info;
+ RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, results, &raw_info));
+ ClusterInfo ci;
+ RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+ RETURN_NOT_OK(PrintLocationBalanceStats(location, raw_info, ci, out));
}
+ // 3. Print information about placement policy violations.
+ RETURN_NOT_OK(PrintPolicyViolationInfo(raw_info, out));
+
return Status::OK();
}
@@ -538,6 +486,194 @@ Status Rebalancer::FilterCrossLocationTabletCandidates(
return Status::OK();
}
+Status Rebalancer::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+ ostream& out) const {
+ // Print location load information.
+ map<string, int64_t> replicas_num_by_location;
+ for (const auto& elem : ci.balance.servers_by_total_replica_count) {
+ const auto& location = FindOrDie(ci.locality.location_by_ts_id, elem.second);
+ LookupOrEmplace(&replicas_num_by_location, location, 0) += elem.first;
+ }
+ out << "Locations load summary:" << endl;
+ DataTable location_load_summary({"Location", "Load"});
+ for (const auto& elem : replicas_num_by_location) {
+ const auto& location = elem.first;
+ const auto servers_num =
+ FindOrDie(ci.locality.servers_by_location, location).size();
+ CHECK_GT(servers_num, 0);
+ double location_load = static_cast<double>(elem.second) / servers_num;
+ location_load_summary.AddRow({ location, to_string(location_load) });
+ }
+ RETURN_NOT_OK(location_load_summary.PrintTo(out));
+ out << endl;
+
+ return Status::OK();
+}
+
+Status Rebalancer::PrintLocationBalanceStats(const string& location,
+ const ClusterRawInfo& raw_info,
+ const ClusterInfo& ci,
+ ostream& out) const {
+ if (!location.empty()) {
+ out << "--------------------------------------------------" << endl;
+ out << "Location: " << location << endl;
+ out << "--------------------------------------------------" << endl;
+ }
+
+ // Per-server replica distribution stats.
+ {
+ out << "Per-server replica distribution summary:" << endl;
+ DataTable summary({"Statistic", "Value"});
+
+ const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
+ if (servers_load_info.empty()) {
+ summary.AddRow({ "N/A", "N/A" });
+ } else {
+ const int64_t total_replica_count = accumulate(
+ servers_load_info.begin(), servers_load_info.end(), 0L,
+ [](int64_t sum, const pair<int32_t, string>& elem) {
+ return sum + elem.first;
+ });
+
+ const auto min_replica_count = servers_load_info.begin()->first;
+ const auto max_replica_count = servers_load_info.rbegin()->first;
+ const double avg_replica_count =
+ 1.0 * total_replica_count / servers_load_info.size();
+
+ summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
+ summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
+ summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
+ }
+ RETURN_NOT_OK(summary.PrintTo(out));
+ out << endl;
+
+ if (config_.output_replica_distribution_details) {
+ const auto& tserver_summaries = raw_info.tserver_summaries;
+ unordered_map<string, string> tserver_endpoints;
+ for (const auto& summary : tserver_summaries) {
+ tserver_endpoints.emplace(summary.uuid, summary.address);
+ }
+
+ out << "Per-server replica distribution details:" << endl;
+ DataTable servers_info({ "UUID", "Address", "Replica Count" });
+ for (const auto& elem : servers_load_info) {
+ const auto& id = elem.second;
+ servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
+ }
+ RETURN_NOT_OK(servers_info.PrintTo(out));
+ out << endl;
+ }
+ }
+
+ // Per-table replica distribution stats.
+ {
+ out << "Per-table replica distribution summary:" << endl;
+ DataTable summary({ "Replica Skew", "Value" });
+ const auto& table_skew_info = ci.balance.table_info_by_skew;
+ if (table_skew_info.empty()) {
+ summary.AddRow({ "N/A", "N/A" });
+ } else {
+ const auto min_table_skew = table_skew_info.begin()->first;
+ const auto max_table_skew = table_skew_info.rbegin()->first;
+ const int64_t sum_table_skew = accumulate(
+ table_skew_info.begin(), table_skew_info.end(), 0L,
+ [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
+ return sum + elem.first;
+ });
+ double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
+
+ summary.AddRow({ "Minimum", to_string(min_table_skew) });
+ summary.AddRow({ "Maximum", to_string(max_table_skew) });
+ summary.AddRow({ "Average", to_string(avg_table_skew) });
+ }
+ RETURN_NOT_OK(summary.PrintTo(out));
+ out << endl;
+
+ if (config_.output_replica_distribution_details) {
+ const auto& table_summaries = raw_info.table_summaries;
+ unordered_map<string, const KsckTableSummary*> table_info;
+ for (const auto& summary : table_summaries) {
+ table_info.emplace(summary.id, &summary);
+ }
+ out << "Per-table replica distribution details:" << endl;
+ DataTable skew(
+ { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
+ for (const auto& elem : table_skew_info) {
+ const auto& table_id = elem.second.table_id;
+ const auto it = table_info.find(table_id);
+ const auto* table_summary =
+ (it == table_info.end()) ? nullptr : it->second;
+ const auto& table_name = table_summary ? table_summary->name : "";
+ const auto total_replica_count = table_summary
+ ? table_summary->replication_factor * table_summary->TotalTablets()
+ : 0;
+ skew.AddRow({ table_id,
+ to_string(total_replica_count),
+ to_string(elem.first),
+ table_name });
+ }
+ RETURN_NOT_OK(skew.PrintTo(out));
+ out << endl;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+ ostream& out) const {
+ TabletsPlacementInfo placement_info;
+ RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+ vector<PlacementPolicyViolationInfo> ppvi;
+ RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+ out << "Placement policy violations:" << endl;
+ if (ppvi.empty()) {
+ out << " none" << endl << endl;;
+ return Status::OK();
+ }
+
+ if (config_.output_replica_distribution_details) {
+ DataTable stats(
+ { "Location", "Table Name", "Tablet", "RF", "Replicas at location" });
+ for (const auto& info : ppvi) {
+ const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+ info.tablet_id);
+ const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+ stats.AddRow({ info.majority_location,
+ table_info.name,
+ info.tablet_id,
+ to_string(table_info.replication_factor),
+ to_string(info.replicas_num_at_majority_location) });
+ }
+ RETURN_NOT_OK(stats.PrintTo(out));
+ } else {
+ DataTable summary({ "Location",
+ "Number of non-complying tables",
+ "Number of non-complying tablets" });
+ typedef pair<unordered_set<string>, unordered_set<string>> TableTabletIds;
+ // Location --> sets of identifiers of tables and tablets hosted by the
+ // tablet servers at the location. The summary is sorted by location.
+ map<string, TableTabletIds> info_by_location;
+ for (const auto& info : ppvi) {
+ const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+ info.tablet_id);
+ auto& elem = LookupOrEmplace(&info_by_location,
+ info.majority_location, TableTabletIds());
+ elem.first.emplace(table_id);
+ elem.second.emplace(info.tablet_id);
+ }
+ for (const auto& elem : info_by_location) {
+ summary.AddRow({ elem.first,
+ to_string(elem.second.first.size()),
+ to_string(elem.second.second.size()) });
+ }
+ RETURN_NOT_OK(summary.PrintTo(out));
+ }
+ out << endl;
+
+ return Status::OK();
+}
+
Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
const MovesInProgress& moves_in_progress,
ClusterInfo* info) const {
@@ -1084,11 +1220,11 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
return Status::OK();
}
unordered_set<string> tablets_in_move;
- std::transform(scheduled_moves_.begin(), scheduled_moves_.end(),
- inserter(tablets_in_move, tablets_in_move.begin()),
- [](const MovesInProgress::value_type& elem) {
- return elem.first;
- });
+ transform(scheduled_moves_.begin(), scheduled_moves_.end(),
+ inserter(tablets_in_move, tablets_in_move.begin()),
+ [](const MovesInProgress::value_type& elem) {
+ return elem.first;
+ });
for (const auto& move : moves) {
vector<string> tablet_ids;
RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 7bb0d73..cbaef49 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -442,6 +442,21 @@ class Rebalancer {
const TableReplicaMove& move,
std::vector<std::string>* tablet_ids);
+ // Print information on the cross-location balance.
+ Status PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+ std::ostream& out) const;
+
+ // Print statistics for the specified location. If 'location' is an empty
+ // string, that's about printing the cluster-wide stats for a cluster that
+ // doesn't have any locations defined.
+ Status PrintLocationBalanceStats(const std::string& location,
+ const ClusterRawInfo& raw_info,
+ const ClusterInfo& ci,
+ std::ostream& out) const;
+
+ Status PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+ std::ostream& out) const;
+
// Convert the 'raw' information about the cluster into information suitable
// for the input of the high-level rebalancing algorithm.
// The 'moves_in_progress' parameter contains information on the replica moves
http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer_tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 60e9f45..9fcd997 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -25,6 +25,7 @@
#include <string>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -88,6 +89,7 @@ using std::thread;
using std::tuple;
using std::unique_ptr;
using std::unordered_map;
+using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -207,28 +209,14 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
ASSERT_STR_MATCHES(err, err_msg_pattern);
}
-// Create tables with unbalanced replica distribution: useful in
-// rebalancer-related tests.
-static Status CreateUnbalancedTables(
+static Status CreateTables(
cluster::ExternalMiniCluster* cluster,
client::KuduClient* client,
const Schema& table_schema,
const string& table_name_pattern,
int num_tables,
int rep_factor,
- int tserver_idx_from,
- int tserver_num,
- int tserver_unresponsive_ms,
vector<string>* table_names = nullptr) {
- // Keep running only some tablet servers and shut down the rest.
- for (auto i = tserver_idx_from; i < tserver_num; ++i) {
- cluster->tablet_server(i)->Shutdown();
- }
-
- // Wait for the catalog manager to understand that not all tablet servers
- // are available.
- SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
-
// Create tables with their tablet replicas landing only on the tablet servers
// which are up and running.
auto client_schema = KuduSchema::FromSchema(table_schema);
@@ -253,6 +241,32 @@ static Status CreateUnbalancedTables(
}
}
+ return Status::OK();
+}
+
+// Create tables with unbalanced replica distribution: useful in
+// rebalancer-related tests.
+static Status CreateUnbalancedTables(
+ cluster::ExternalMiniCluster* cluster,
+ client::KuduClient* client,
+ const Schema& table_schema,
+ const string& table_name_pattern,
+ int num_tables,
+ int rep_factor,
+ int tserver_idx_from,
+ int tserver_num,
+ int tserver_unresponsive_ms,
+ vector<string>* table_names = nullptr) {
+ // Keep running only some tablet servers and shut down the rest.
+ for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+ cluster->tablet_server(i)->Shutdown();
+ }
+
+ // Wait for the catalog manager to understand that not all tablet servers
+ // are available.
+ SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
+ RETURN_NOT_OK(CreateTables(cluster, client, table_schema, table_name_pattern,
+ num_tables, rep_factor, table_names));
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
}
@@ -404,9 +418,13 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
static const char* const kExitOnSignalStr;
static const char* const kTableNamePattern;
+ // Working around limitations of older libstdc++.
+ static const unordered_set<string> kEmptySet;
+
void Prepare(const vector<string>& extra_tserver_flags = {},
const vector<string>& extra_master_flags = {},
const LocationInfo& location_info = {},
+ const unordered_set<string>& empty_locations = kEmptySet,
vector<string>* created_tables_names = nullptr) {
const auto& scheme_flag = Substitute(
"--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
@@ -420,12 +438,60 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
FLAGS_num_tablet_servers = num_tservers_;
FLAGS_num_replicas = rep_factor_;
- NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info));
+ NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info,
+ /*create_table=*/ false));
+
+ if (location_info.empty()) {
+ ASSERT_OK(CreateUnbalancedTables(
+ cluster_.get(), client_.get(), schema_, kTableNamePattern,
+ num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
+ tserver_unresponsive_ms_, created_tables_names));
+ } else {
+ ASSERT_OK(CreateTablesExcludingLocations(empty_locations,
+ created_tables_names));
+ }
+ }
+
+ // Create tables placing their tablet replicas everywhere but not at the
+ // tablet servers in the specified locations. This is similar to
+ // CreateUnbalancedTables() but the set of tablet servers to avoid is defined
+ // by the set of the specified locations.
+ Status CreateTablesExcludingLocations(
+ const unordered_set<string>& excluded_locations,
+ vector<string>* table_names = nullptr) {
+ // Shutdown all tablet servers in the specified locations so no tablet
+ // replicas would be hosted by those servers.
+ unordered_set<string> seen_locations;
+ if (!excluded_locations.empty()) {
+ for (const auto& elem : tablet_servers_) {
+ auto* ts = elem.second;
+ if (ContainsKey(excluded_locations, ts->location)) {
+ cluster_->tablet_server_by_uuid(ts->uuid())->Shutdown();
+ EmplaceIfNotPresent(&seen_locations, ts->location);
+ }
+ }
+ }
+ // Sanity check: every specified location should have been seen, otherwise
+ // something is wrong with the tablet servers' registration.
+ CHECK_EQ(excluded_locations.size(), seen_locations.size());
+
+ // Wait for the catalog manager to understand that not all tablet servers
+ // are available.
+ SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms_ / 4));
+ RETURN_NOT_OK(CreateTables(cluster_.get(), client_.get(), schema_,
+ kTableNamePattern, num_tables_, rep_factor_,
+ table_names));
+ // Start tablet servers at the excluded locations.
+ if (!excluded_locations.empty()) {
+ for (const auto& elem : tablet_servers_) {
+ auto* ts = elem.second;
+ if (ContainsKey(excluded_locations, ts->location)) {
+ RETURN_NOT_OK(cluster_->tablet_server_by_uuid(ts->uuid())->Restart());
+ }
+ }
+ }
- ASSERT_OK(CreateUnbalancedTables(
- cluster_.get(), client_.get(), schema_, kTableNamePattern,
- num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
- tserver_unresponsive_ms_, created_tables_names));
+ return Status::OK();
}
// When the rebalancer starts moving replicas, ksck detects corruption
@@ -454,6 +520,7 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
};
const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0";
+const unordered_set<string> RebalancingTest::kEmptySet = unordered_set<string>();
typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest;
@@ -1185,7 +1252,7 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, };
vector<string> table_names;
- NO_FATALS(Prepare({}, {}, location_info, &table_names));
+ NO_FATALS(Prepare({}, {}, location_info, kEmptySet, &table_names));
const vector<string> tool_args = {
"cluster",
@@ -1261,5 +1328,104 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
}
}
+class LocationAwareBalanceInfoTest : public RebalancingTest {
+ public:
+ LocationAwareBalanceInfoTest()
+ : RebalancingTest(/*num_tables=*/ 1,
+ /*rep_factor=*/ 3,
+ /*num_tservers=*/ 5) {
+ }
+
+ bool is_343_scheme() const override {
+ // These tests are for the 3-4-3 replica management scheme only.
+ return true;
+ }
+};
+
+// Verify the output of the location-aware rebalancer against a cluster
+// that has multiple locations.
+TEST_F(LocationAwareBalanceInfoTest, ReportOnly) {
+ static const char kReferenceOutput[] =
+ R"***(Locations load summary:
+ Location | Load
+----------+----------
+ /A | 3.000000
+ /B | 3.000000
+ /C | 0.000000
+
+--------------------------------------------------
+Location: /A
+--------------------------------------------------
+Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+----------
+ Minimum | 0
+ Maximum | 0
+ Average | 0.000000
+
+--------------------------------------------------
+Location: /B
+--------------------------------------------------
+Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+----------
+ Minimum | 0
+ Maximum | 0
+ Average | 0.000000
+
+--------------------------------------------------
+Location: /C
+--------------------------------------------------
+Per-server replica distribution summary:
+ Statistic | Value
+-----------------------+----------
+ Minimum Replica Count | 0
+ Maximum Replica Count | 0
+ Average Replica Count | 0.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+-------
+ N/A | N/A
+
+Placement policy violations:
+ Location | Number of non-complying tables | Number of non-complying tablets
+----------+--------------------------------+---------------------------------
+ /B | 1 | 3
+)***";
+
+ const LocationInfo location_info = { { "/A", 1 }, { "/B", 2 }, { "/C", 2 }, };
+ NO_FATALS(Prepare({}, {}, location_info, { "/C" }));
+
+ string out;
+ string err;
+ Status s = RunKuduTool({
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--report_only",
+ }, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ // The output should match the reference report.
+ ASSERT_STR_CONTAINS(out, kReferenceOutput);
+ // The actual rebalancing should not run.
+ ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:")
+ << ToolRunInfo(s, out, err);
+}
+
} // namespace tools
} // namespace kudu
[3/3] kudu git commit: [tools] reference comparison mode for
rebalancing algo tests
Posted by al...@apache.org.
[tools] reference comparison mode for rebalancing algo tests
Introduced comparison mode for rebalancing algorithms' tests.
For current rebalancing algorithms, it's natural to re-order contiguous
moves of the same weight. That's because of:
* Iterating over elements of a hash container keyed by
the weight of a move.
* Randomly choosing among multiple options of the same weight.
This patch adds MovesOrderingComparison::IGNORE option into one test
configuration of the RebalanceAlgoUnitTest.LocationBalancingSimpleST
scenario. That fixes the breakage of the test on Ubuntu 18.
Change-Id: I8363f013b5bf8caa3e3b967c64eccca95c763a91
Reviewed-on: http://gerrit.cloudera.org:8080/11870
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6ce61d6e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6ce61d6e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6ce61d6e
Branch: refs/heads/master
Commit: 6ce61d6e67f3cb54ed034fda9c5dd443970ea454
Parents: 8e9345a
Author: Alexey Serbin <al...@apache.org>
Authored: Fri Nov 2 12:06:29 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Nov 6 22:28:12 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/rebalance_algo-test.cc | 63 ++++++++++++++++++++++++++++--
1 file changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/6ce61d6e/src/kudu/tools/rebalance_algo-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/tools/rebalance_algo-test.cc
index 3271339..6a5e5a8 100644
--- a/src/kudu/tools/rebalance_algo-test.cc
+++ b/src/kudu/tools/rebalance_algo-test.cc
@@ -17,6 +17,7 @@
#include "kudu/tools/rebalance_algo.h"
+#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iostream>
@@ -67,6 +68,7 @@ using std::endl;
using std::ostream;
using std::ostringstream;
using std::set;
+using std::sort;
using std::string;
using std::unordered_map;
using std::vector;
@@ -84,6 +86,23 @@ struct TablePerServerReplicas {
const vector<size_t> num_replicas_by_server;
};
+// Whether the order of the moves in the reference results should be verified
+// against the actual moves.
+enum class MovesOrderingComparison {
+ IGNORE,
+ VERIFY,
+};
+struct ReferenceComparisonOptions {
+ // Constructor to initialize the options by default.
+ // NOLINTNEXTLINE(google-explicit-constructor)
+ ReferenceComparisonOptions(MovesOrderingComparison moves_ordering =
+ MovesOrderingComparison::VERIFY)
+ : moves_ordering(moves_ordering) {
+ }
+
+ const MovesOrderingComparison moves_ordering;
+};
+
// Structure to describe rebalancing-related state of the cluster expressively
// enough for the tests.
struct TestClusterConfig {
@@ -104,6 +123,9 @@ struct TestClusterConfig {
// The expected replica movements: the reference output of the algorithm
// to compare with.
const vector<TableReplicaMove> expected_moves;
+
+ // Options controlling how the reference and the actual results are compared.
+ const ReferenceComparisonOptions ref_comparison_options;
};
bool operator==(const TableReplicaMove& lhs, const TableReplicaMove& rhs) {
@@ -201,7 +223,40 @@ void VerifyLocationRebalancingMoves(const TestClusterConfig& cfg) {
LocationBalancingAlgo algo;
ASSERT_OK(algo.GetNextMoves(ci, 0, &moves));
}
- EXPECT_EQ(cfg.expected_moves, moves);
+ switch (cfg.ref_comparison_options.moves_ordering) {
+ case MovesOrderingComparison::IGNORE:
+ {
+ // The case when the order of moves is not important. For the
+ // rebalancing algorithms, it's natural to re-order contiguous moves
+ // of the same weight. This is because of:
+ // a) randomly choosing among multiple options of the same weight
+ // b) iterating over elements of a hash container keyed by the weight
+ // of a move.
+ // Here it's necessary to normalize both the reference and the actual
+ // results before performing element-to-element comparison.
+ vector<TableReplicaMove> ref_moves(cfg.expected_moves);
+ constexpr auto kMovesComparator = [](const TableReplicaMove& lhs,
+ const TableReplicaMove& rhs) {
+ if (lhs.table_id != rhs.table_id) {
+ return lhs.table_id < rhs.table_id;
+ }
+ if (lhs.from != rhs.from) {
+ return lhs.from < rhs.from;
+ }
+ return lhs.to < rhs.to;
+ };
+ sort(ref_moves.begin(), ref_moves.end(), kMovesComparator);
+ sort(moves.begin(), moves.end(), kMovesComparator);
+ EXPECT_EQ(ref_moves, moves);
+ }
+ break;
+ case MovesOrderingComparison::VERIFY:
+ EXPECT_EQ(cfg.expected_moves, moves);
+ break;
+ default:
+ FAIL() << "unexpected reference comparison style";
+ break;
+ }
}
// Is 'cbi' balanced according to the two-dimensional greedy algorithm?
@@ -984,13 +1039,13 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleST) {
},
{ "0", "1", "2", },
{ { "A", { 6, 0, 0, } }, },
- // TODO(aserbin): what about ordering?
{
- { "A", "0", "2" },
{ "A", "0", "1" },
+ { "A", "0", "2" },
{ "A", "0", "1" },
{ "A", "0", "2" },
- }
+ },
+ { MovesOrderingComparison::IGNORE }
},
{
{