You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/11/06 22:38:29 UTC

[1/3] kudu git commit: KUDU-1918 Prevent hijacking of scanner IDs

Repository: kudu
Updated Branches:
  refs/heads/master 038c6bc05 -> 6ce61d6e6


KUDU-1918 Prevent hijacking of scanner IDs

This makes the scanner remember its RemoteUser, and ensures that when
continuing a scan, the new requestor matches the original requestor.

This prevents one user from somehow obtaining a scanner ID from another
and then "hijacking" the in-progress scan.

This restricts scans, checksum scans, and keep-alive requests.

Change-Id: Ic91fa0ca471bd674e35aa2f8de3806b88ad4b3b4
Reviewed-on: http://gerrit.cloudera.org:8080/6348
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e172df40
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e172df40
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e172df40

Branch: refs/heads/master
Commit: e172df405aef47b1339c9879b835baf69b539f8c
Parents: 038c6bc
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Nov 1 10:49:41 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Tue Nov 6 20:55:31 2018 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc              | 65 +++++++++++++++++
 src/kudu/client/client.h                    |  1 +
 src/kudu/client/scanner-internal.cc         |  3 +
 src/kudu/client/scanner-internal.h          |  3 +
 src/kudu/tserver/scanners-test.cc           | 25 ++++---
 src/kudu/tserver/scanners.cc                | 36 +++++++---
 src/kudu/tserver/scanners.h                 | 39 +++++++----
 src/kudu/tserver/tablet_server-test-base.cc | 12 ++--
 src/kudu/tserver/tablet_server-test-base.h  |  4 ++
 src/kudu/tserver/tablet_server-test.cc      | 88 +++++++++++++++++++++++-
 src/kudu/tserver/tablet_service.cc          | 46 +++++++++----
 src/kudu/tserver/tserver.proto              |  3 +
 src/kudu/tserver/tserver_path_handlers.cc   |  3 +-
 13 files changed, 272 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index efcdd68..10a4c6c 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5714,5 +5714,70 @@ TEST_F(ClientTest, TestAuthenticationCredentialsRealUser) {
   KuduScanner scanner(client_table_.get());
   ASSERT_OK(ScanToStrings(&scanner, &rows));
 }
+
+// Test that clients that aren't authenticated as the appropriate user will be
+// unable to hijack a specific scanner ID.
+TEST_F(ClientTest, TestBlockScannerHijackingAttempts) {
+  const string kUser = "token-user";
+  const string kBadGuy = "bad-guy";
+  const string table_name = client_table_->name();
+  FLAGS_user_acl = Substitute("$0,$1", kUser, kBadGuy);
+  cluster_->ShutdownNodes(cluster::ClusterNodes::ALL);
+  ASSERT_OK(cluster_->StartSync());
+
+  // Insert some rows to the table.
+  NO_FATALS(InsertTestRows(client_table_.get(), FLAGS_test_scan_num_rows));
+
+  // First authenticate as a user and create a scanner for the existing table.
+  const auto get_table_as_user = [&] (const string& user, shared_ptr<KuduTable>* table) {
+    KuduClientBuilder client_builder;
+    string authn_creds;
+    AuthenticationCredentialsPB pb;
+    pb.set_real_user(user);
+    ASSERT_TRUE(pb.SerializeToString(&authn_creds));
+    client_builder.import_authentication_credentials(authn_creds);
+
+    // Create the client and table for the user.
+    shared_ptr<KuduClient> user_client;
+    ASSERT_OK(cluster_->CreateClient(&client_builder, &user_client));
+    ASSERT_OK(user_client->OpenTable(table_name, table));
+  };
+
+  shared_ptr<KuduTable> user_table;
+  shared_ptr<KuduTable> bad_guy_table;
+  NO_FATALS(get_table_as_user(kUser, &user_table));
+  NO_FATALS(get_table_as_user(kBadGuy, &bad_guy_table));
+
+  // Test both fault-tolerant scanners and non-fault-tolerant scanners.
+  for (bool fault_tolerance : { true, false }) {
+    // Scan the table as the user to get a scanner ID, and set up a malicious
+    // scanner that will try to hijack that scanner ID. Set an initial batch
+    // size of 0 so the calls to Open() don't buffer any rows.
+    KuduScanner user_scanner(user_table.get());
+    ASSERT_OK(user_scanner.SetBatchSizeBytes(0));
+    KuduScanner bad_guy_scanner(bad_guy_table.get());
+    ASSERT_OK(bad_guy_scanner.SetBatchSizeBytes(0));
+    if (fault_tolerance) {
+      ASSERT_OK(user_scanner.SetFaultTolerant());
+      ASSERT_OK(bad_guy_scanner.SetFaultTolerant());
+    }
+    ASSERT_OK(user_scanner.Open());
+    ASSERT_OK(bad_guy_scanner.Open());
+    const string scanner_id = user_scanner.data_->next_req_.scanner_id();
+    ASSERT_FALSE(scanner_id.empty());
+
+    // Now attempt to get that scanner id as a different user.
+    LOG(INFO) << Substitute("Attempting to extract data from $0 scan $1 as $2",
+        fault_tolerance ? "fault-tolerant" : "non-fault-tolerant", scanner_id, kBadGuy);
+    bad_guy_scanner.data_->next_req_.set_scanner_id(scanner_id);
+    bad_guy_scanner.data_->last_response_.set_has_more_results(true);
+    KuduScanBatch batch;
+    Status s = bad_guy_scanner.NextBatch(&batch);
+    ASSERT_TRUE(s.IsRemoteError());
+    ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+    ASSERT_EQ(0, batch.NumRows());
+  }
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 56c3f50..626df7d 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2224,6 +2224,7 @@ class KUDU_EXPORT KuduScanner {
   class KUDU_NO_EXPORT Data;
 
   friend class KuduScanToken;
+  FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts);
   FRIEND_TEST(ClientTest, TestScanCloseProxy);
   FRIEND_TEST(ClientTest, TestScanFaultTolerance);
   FRIEND_TEST(ClientTest, TestScanNoBlockCaching);

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 9ac1a86..d7c30bb 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -243,6 +243,9 @@ ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status,
         case rpc::ErrorStatusPB::ERROR_UNAVAILABLE:
           return ScanRpcStatus{
               ScanRpcStatus::SERVICE_UNAVAILABLE, rpc_status};
+        case rpc::ErrorStatusPB::FATAL_UNAUTHORIZED:
+          return ScanRpcStatus{
+              ScanRpcStatus::SCAN_NOT_AUTHORIZED, rpc_status};
         default:
           return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 0dddb40..5e7652b 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -89,6 +89,9 @@ struct ScanRpcStatus {
     // the token has expired.
     RPC_INVALID_AUTHENTICATION_TOKEN,
 
+    // The requestor was not authorized to make the request.
+    SCAN_NOT_AUTHORIZED,
+
     // Another RPC-system error (e.g. NetworkError because the TS was down).
     RPC_ERROR,
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners-test.cc b/src/kudu/tserver/scanners-test.cc
index 7a2084e..e2456c4 100644
--- a/src/kudu/tserver/scanners-test.cc
+++ b/src/kudu/tserver/scanners-test.cc
@@ -24,42 +24,51 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/scanner_metrics.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
 
 DECLARE_int32(scanner_ttl_ms);
 
 namespace kudu {
 
+using rpc::RemoteUser;
+using std::vector;
 using tablet::TabletReplica;
 
 namespace tserver {
 
-using std::vector;
+static const char* kUsername = "kudu-user";
 
 TEST(ScannersTest, TestManager) {
   scoped_refptr<TabletReplica> null_replica(nullptr);
   ScannerManager mgr(nullptr);
 
   // Create two scanners, make sure their ids are different.
+  RemoteUser user;
+  user.SetUnauthenticated(kUsername);
   SharedScanner s1, s2;
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1);
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2);
+  mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s1);
+  mgr.NewScanner(null_replica, user, RowFormatFlags::NO_FLAGS, &s2);
   ASSERT_NE(s1->id(), s2->id());
 
   // Check that they're both registered.
   SharedScanner result;
-  ASSERT_TRUE(mgr.LookupScanner(s1->id(), &result));
+  TabletServerErrorPB::Code error_code;
+  ASSERT_OK(mgr.LookupScanner(s1->id(), kUsername, &error_code, &result));
   ASSERT_EQ(result.get(), s1.get());
 
-  ASSERT_TRUE(mgr.LookupScanner(s2->id(), &result));
+  ASSERT_OK(mgr.LookupScanner(s2->id(), kUsername, &error_code, &result));
   ASSERT_EQ(result.get(), s2.get());
 
   // Check that looking up a bad scanner returns false.
-  ASSERT_FALSE(mgr.LookupScanner("xxx", &result));
+  ASSERT_TRUE(mgr.LookupScanner("xxx", kUsername, &error_code, &result).IsNotFound());
+  ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code);
 
   // Remove the scanners.
   ASSERT_TRUE(mgr.UnregisterScanner(s1->id()));
@@ -75,8 +84,8 @@ TEST(ScannerTest, TestExpire) {
   MetricRegistry registry;
   ScannerManager mgr(METRIC_ENTITY_server.Instantiate(&registry, "test"));
   SharedScanner s1, s2;
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s1);
-  mgr.NewScanner(null_replica, "", RowFormatFlags::NO_FLAGS, &s2);
+  mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s1);
+  mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s2);
   SleepFor(MonoDelta::FromMilliseconds(200));
   s2->UpdateAccessTime();
   mgr.RemoveExpiredScanners();

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 6fb9d77..60f69fb 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -34,6 +34,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_metrics.h"
@@ -68,6 +69,7 @@ using strings::Substitute;
 
 namespace kudu {
 
+using rpc::RemoteUser;
 using tablet::TabletReplica;
 
 namespace tserver {
@@ -131,20 +133,16 @@ ScannerManager::ScannerMapStripe& ScannerManager::GetStripeByScannerId(const str
 }
 
 void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_replica,
-                                const std::string& requestor_string,
+                                const RemoteUser& remote_user,
                                 uint64_t row_format_flags,
                                 SharedScanner* scanner) {
   // Keep trying to generate a unique ID until we get one.
   bool success = false;
   while (!success) {
-    // TODO(KUDU-1918): are these UUIDs predictable? If so, we should
-    // probably generate random numbers instead, since we can safely
-    // just retry until we avoid a collision. Alternatively we could
-    // verify that the requestor userid does not change mid-scan.
     string id = oid_generator_.Next();
     scanner->reset(new Scanner(id,
                                tablet_replica,
-                               requestor_string,
+                               remote_user,
                                metrics_.get(),
                                row_format_flags));
 
@@ -154,10 +152,26 @@ void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_repli
   }
 }
 
-bool ScannerManager::LookupScanner(const string& scanner_id, SharedScanner* scanner) {
+Status ScannerManager::LookupScanner(const string& scanner_id,
+                                     const string& username,
+                                     TabletServerErrorPB::Code* error_code,
+                                     SharedScanner* scanner) {
+  SharedScanner ret;
   ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
   shared_lock<RWMutex> l(stripe.lock_);
-  return FindCopy(stripe.scanners_by_id_, scanner_id, scanner);
+  bool found_scanner = FindCopy(stripe.scanners_by_id_, scanner_id, &ret);
+  if (!found_scanner) {
+    *error_code = TabletServerErrorPB::SCANNER_EXPIRED;
+    return Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
+                                       scanner_id));
+  }
+  if (username != ret->remote_user().username()) {
+    *error_code = TabletServerErrorPB::NOT_AUTHORIZED;
+    return Status::NotAuthorized(Substitute("User $0 doesn't own scanner $1",
+                                            username, scanner_id));
+  }
+  *scanner = std::move(ret);
+  return Status::OK();
 }
 
 bool ScannerManager::UnregisterScanner(const string& scanner_id) {
@@ -290,11 +304,11 @@ void ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) {
 const std::string Scanner::kNullTabletId = "null tablet";
 
 Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
-                 string requestor_string, ScannerMetrics* metrics,
+                 RemoteUser remote_user, ScannerMetrics* metrics,
                  uint64_t row_format_flags)
     : id_(std::move(id)),
       tablet_replica_(tablet_replica),
-      requestor_string_(std::move(requestor_string)),
+      remote_user_(std::move(remote_user)),
       call_seq_id_(0),
       start_time_(MonoTime::Now()),
       metrics_(metrics),
@@ -353,7 +367,7 @@ ScanDescriptor Scanner::descriptor() const {
   ScanDescriptor descriptor;
   descriptor.tablet_id = tablet_id();
   descriptor.scanner_id = id();
-  descriptor.requestor = requestor_string();
+  descriptor.remote_user = remote_user();
   descriptor.start_time = start_time_;
 
   for (const auto& column : client_projection_schema()->columns()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/scanners.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h
index 9588053..354ec90 100644
--- a/src/kudu/tserver/scanners.h
+++ b/src/kudu/tserver/scanners.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TSERVER_SCANNERS_H
-#define KUDU_TSERVER_SCANNERS_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -34,7 +33,9 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/auto_release_pool.h"
 #include "kudu/util/condition_variable.h"
 #include "kudu/util/locks.h"
@@ -55,9 +56,11 @@ class Thread;
 namespace tserver {
 
 class Scanner;
+
 enum class ScanState;
 struct ScanDescriptor;
 struct ScannerMetrics;
+
 typedef std::shared_ptr<Scanner> SharedScanner;
 
 // Manages the live scanners within a Tablet Server.
@@ -76,15 +79,22 @@ class ScannerManager {
   // Starts the expired scanner removal thread.
   Status StartRemovalThread();
 
-  // Create a new scanner with a unique ID, inserting it into the map.
+  // Create a new scanner with a unique ID, inserting it into the map. Further
+  // lookups for the scanner must provide the username associated with
+  // 'remote_user'.
   void NewScanner(const scoped_refptr<tablet::TabletReplica>& tablet_replica,
-                  const std::string& requestor_string,
+                  const rpc::RemoteUser& remote_user,
                   uint64_t row_format_flags,
                   SharedScanner* scanner);
 
-  // Lookup the given scanner by its ID.
-  // Returns true if the scanner is found successfully.
-  bool LookupScanner(const std::string& scanner_id, SharedScanner* scanner);
+  // Lookup the given scanner by its ID with the provided username, setting an
+  // appropriate error code.
+  // Returns NotFound if the scanner doesn't exist, or NotAuthorized if the
+  // scanner wasn't created by 'username'.
+  Status LookupScanner(const std::string& scanner_id,
+                       const std::string& username,
+                       TabletServerErrorPB::Code* error_code,
+                       SharedScanner* scanner);
 
   // Unregister the given scanner by its ID.
   // Returns true if unregistered successfully.
@@ -185,7 +195,7 @@ class Scanner {
  public:
   Scanner(std::string id,
           const scoped_refptr<tablet::TabletReplica>& tablet_replica,
-          std::string requestor_string, ScannerMetrics* metrics,
+          rpc::RemoteUser remote_user, ScannerMetrics* metrics,
           uint64_t row_format_flags);
   ~Scanner();
 
@@ -238,7 +248,7 @@ class Scanner {
 
   const scoped_refptr<tablet::TabletReplica>& tablet_replica() const { return tablet_replica_; }
 
-  const std::string& requestor_string() const { return requestor_string_; }
+  const rpc::RemoteUser& remote_user() const { return remote_user_; }
 
   // Returns the current call sequence ID of the scanner.
   uint32_t call_seq_id() const {
@@ -322,9 +332,9 @@ class Scanner {
   // Tablet associated with the scanner.
   const scoped_refptr<tablet::TabletReplica> tablet_replica_;
 
-  // Information about the requestor. Populated from
-  // RpcContext::requestor_string().
-  const std::string requestor_string_;
+  // The remote user making the request. Populated from the RemoteUser of the
+  // first request.
+  const rpc::RemoteUser remote_user_;
 
   // The last time that the scanner was accessed.
   MonoTime last_access_time_;
@@ -392,8 +402,8 @@ struct ScanDescriptor {
   // The scanner ID.
   std::string scanner_id;
 
-  // The scan requestor.
-  std::string requestor;
+  // The user that made the first request.
+  rpc::RemoteUser remote_user;
 
   // The table name.
   std::string table_name;
@@ -415,4 +425,3 @@ struct ScanDescriptor {
 } // namespace tserver
 } // namespace kudu
 
-#endif

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index 924bf5e..59b26e9 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -458,6 +458,12 @@ void TabletServerTestBase::VerifyScanRequestFailure(
   }
 }
 
+Status TabletServerTestBase::FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const {
+  scan->set_tablet_id(kTabletId);
+  scan->set_read_mode(read_mode);
+  return SchemaToColumnPBs(schema_, scan->mutable_projected_columns());
+}
+
 // Open a new scanner which scans all of the columns in the table.
 void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
                                                      ReadMode read_mode) {
@@ -465,11 +471,7 @@ void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
   RpcController rpc;
 
   // Set up a new request with no predicates, all columns.
-  const Schema& projection = schema_;
-  NewScanRequestPB* scan = req.mutable_new_scan_request();
-  scan->set_tablet_id(kTabletId);
-  scan->set_read_mode(read_mode);
-  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
+  ASSERT_OK(FillNewScanRequest(read_mode, req.mutable_new_scan_request()));
   req.set_call_seq_id(0);
   req.set_batch_size_bytes(0); // so it won't return data right away
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 9841021..b80dfac 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -120,6 +120,10 @@ class TabletServerTestBase : public KuduTest {
   void OpenScannerWithAllColumns(ScanResponsePB* resp,
                                  ReadMode read_mode = READ_LATEST);
 
+  // Fills out a new scan request on all of the columns in the table with the
+  // given read mode.
+  Status FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const;
+
  protected:
   static const char* kTableId;
   static const char* kTabletId;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 3a734ea..a12e99b 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -70,6 +70,7 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/user_credentials.h"
 #include "kudu/server/rpc_server.h"
 #include "kudu/server/server_base.pb.h"
 #include "kudu/server/server_base.proxy.h"
@@ -1529,7 +1530,9 @@ TEST_F(TabletServerTest, TestReadLatest) {
   ASSERT_TRUE(!scanner_id.empty());
   {
     SharedScanner junk;
-    ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk));
+    TabletServerErrorPB::Code error_code;
+    ASSERT_OK(mini_server_->server()->scanner_manager()->LookupScanner(
+        scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk));
   }
 
   // Ensure that the scanner shows up in the server and tablet's metrics.
@@ -1552,7 +1555,10 @@ TEST_F(TabletServerTest, TestReadLatest) {
   // from the scanner manager.
   {
     SharedScanner junk;
-    ASSERT_FALSE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk));
+    TabletServerErrorPB::Code error_code;
+    ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(
+        scanner_id, proxy_->user_credentials().real_user(), &error_code, &junk).IsNotFound());
+    ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, error_code);
   }
 
   // Ensure that the metrics have been updated now that the scanner is unregistered.
@@ -3397,5 +3403,83 @@ TEST_F(TabletServerTest, TestKeysInRowsetMetadataPreventStartupSeeks) {
   restart_server_and_check_bytes_read(/*keys_in_rowset_meta=*/ true);
 }
 
+// Test that each scanner can only be accessed by the user who created it.
+TEST_F(TabletServerTest, TestScannerCheckMatchingUser) {
+  rpc::UserCredentials user;
+  user.set_real_user("good-guy");
+  proxy_->set_user_credentials(user);
+
+  InsertTestRowsDirect(0, 100);
+  ScanResponsePB resp;
+  NO_FATALS(OpenScannerWithAllColumns(&resp));
+  const string& scanner_id = resp.scanner_id();
+  ASSERT_TRUE(!scanner_id.empty());
+
+  // Now do a checksum scan as the user.
+  string checksum_scanner_id;
+  int64_t checksum_val;
+  {
+    ChecksumRequestPB checksum_req;
+    ChecksumResponsePB checksum_resp;
+    RpcController rpc;
+    ASSERT_OK(FillNewScanRequest(READ_LATEST, checksum_req.mutable_new_request()));
+    // Set a batch size of 0 so we don't return rows and can expect the scanner
+    // to remain alive.
+    checksum_req.set_batch_size_bytes(0);
+    ASSERT_OK(proxy_->Checksum(checksum_req, &checksum_resp, &rpc));
+    SCOPED_TRACE(checksum_resp.DebugString());
+    ASSERT_FALSE(checksum_resp.has_error());
+    ASSERT_TRUE(checksum_resp.has_more_results());
+    checksum_scanner_id = checksum_resp.scanner_id();
+    checksum_val = checksum_resp.checksum();
+  }
+
+  constexpr auto verify_authz_error = [] (const Status& s) {
+    EXPECT_TRUE(s.IsRemoteError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
+  };
+
+  for (const string& other : { "", "bad-guy" }) {
+    TabletServerServiceProxy bad_proxy(
+        client_messenger_, mini_server_->bound_rpc_addr(),
+        mini_server_->bound_rpc_addr().host());
+    if (!other.empty()) {
+      rpc::UserCredentials other_user;
+      other_user.set_real_user(other);
+      bad_proxy.set_user_credentials(other_user);
+    }
+    // Other users and clients with no credentials will be bounced for scans,
+    // checksum scans, and keep-alive requests.
+    {
+      ScanRequestPB req;
+      RpcController rpc;
+      req.set_scanner_id(scanner_id);
+      Status s = bad_proxy.Scan(req, &resp, &rpc);
+      SCOPED_TRACE(resp.DebugString());
+      NO_FATALS(verify_authz_error(s));
+    }
+    {
+      ChecksumRequestPB req;
+      ContinueChecksumRequestPB* continue_req = req.mutable_continue_request();
+      continue_req->set_scanner_id(checksum_scanner_id);
+      continue_req->set_previous_checksum(checksum_val);
+      ChecksumResponsePB resp;
+      RpcController rpc;
+      Status s = bad_proxy.Checksum(req, &resp, &rpc);
+      SCOPED_TRACE(resp.DebugString());
+      NO_FATALS(verify_authz_error(s));
+    }
+    for (const string& id : { scanner_id, checksum_scanner_id }) {
+      ScannerKeepAliveRequestPB req;
+      req.set_scanner_id(id);
+      ScannerKeepAliveResponsePB resp;
+      RpcController rpc;
+      Status s = bad_proxy.ScannerKeepAlive(req, &resp, &rpc);
+      SCOPED_TRACE(resp.DebugString());
+      NO_FATALS(verify_authz_error(s));
+    }
+  }
+}
+
 } // namespace tserver
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index da4002c..9b3177e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -61,6 +61,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
@@ -367,6 +368,12 @@ static void SetupErrorAndRespond(TabletServerErrorPB* error,
                                  const Status& s,
                                  TabletServerErrorPB::Code code,
                                  rpc::RpcContext* context) {
+  // Non-authorized errors will drop the connection.
+  if (code == TabletServerErrorPB::NOT_AUTHORIZED) {
+    DCHECK(s.IsNotAuthorized());
+    context->RespondRpcFailure(rpc::ErrorStatusPB::FATAL_UNAUTHORIZED, s);
+    return;
+  }
   // Generic "service unavailable" errors will cause the client to retry later.
   if ((code == TabletServerErrorPB::UNKNOWN_ERROR ||
        code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) {
@@ -1287,14 +1294,23 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
                                          rpc::RpcContext *context) {
   DCHECK(req->has_scanner_id());
   SharedScanner scanner;
-  if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
-    resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED);
-    Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
-                                           req->scanner_id()));
+  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
+  Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
+                                                       context->remote_user().username(),
+                                                       &error_code,
+                                                       &scanner);
+  if (!s.ok()) {
     StatusToPB(s, resp->mutable_error()->mutable_status());
     LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1",
                             s.ToString(), context->requestor_string());
-    context->RespondSuccess();
+    if (PREDICT_TRUE(s.IsNotFound())) {
+      resp->mutable_error()->set_code(error_code);
+      StatusToPB(s, resp->mutable_error()->mutable_status());
+      context->RespondSuccess();
+      return;
+    }
+    DCHECK(s.IsNotAuthorized());
+    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
     return;
   }
   scanner->UpdateAccessTime();
@@ -1545,7 +1561,7 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
 
   ScanResultChecksummer collector;
   bool has_more = false;
-  TabletServerErrorPB::Code error_code;
+  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
   if (req->has_new_request()) {
     scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
     const NewScanRequestPB& new_req = req->new_request();
@@ -1814,7 +1830,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
 
   SharedScanner scanner;
   server_->scanner_manager()->NewScanner(replica,
-                                         rpc_context->requestor_string(),
+                                         rpc_context->remote_user(),
                                          scan_pb.row_format_flags(),
                                          &scanner);
   TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
@@ -2043,17 +2059,19 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
   // in case multiple RPCs hit the same scanner at the same time. Probably
   // just a trylock and fail the RPC if it contends.
   SharedScanner scanner;
-  if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
-    if (batch_size_bytes == 0 && req->close_scanner()) {
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
+                                                       rpc_context->remote_user().username(),
+                                                       &code,
+                                                       &scanner);
+  if (!s.ok()) {
+    if (s.IsNotFound() && batch_size_bytes == 0 && req->close_scanner()) {
       // Silently ignore any request to close a non-existent scanner.
       return Status::OK();
     }
-
-    *error_code = TabletServerErrorPB::SCANNER_EXPIRED;
-    Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
-                                           req->scanner_id()));
     LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2",
                             s.ToString(), req->call_seq_id(), rpc_context->requestor_string());
+    *error_code = code;
     return s;
   }
 
@@ -2146,7 +2164,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
   scoped_refptr<TabletReplica> replica = scanner->tablet_replica();
   shared_ptr<Tablet> tablet;
   TabletServerErrorPB::Code tablet_ref_error_code;
-  const Status s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
+  s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
   // If the tablet is not running, but the scan operation in progress
   // has reached this point, the tablet server has the necessary data to
   // send in response for the scan continuation request.

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index f9f8cef..afe1b71 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -96,6 +96,9 @@ message TabletServerErrorPB {
 
     // The tablet needs to be evicted and reassigned.
     TABLET_FAILED = 20;
+
+    // The request is disallowed for the given user.
+    NOT_AUTHORIZED = 21;
   }
 
   // The error code.

http://git-wip-us.apache.org/repos/asf/kudu/blob/e172df40/src/kudu/tserver/tserver_path_handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index 8fa0196..52ec87f 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -48,6 +48,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/server/webui_util.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.h"
@@ -536,7 +537,7 @@ void ScanToJson(const ScanDescriptor& scan, EasyJson* json) {
   json->Set("scanner_id", scan.scanner_id);
   json->Set("state", ScanStateToString(scan.state));
   json->Set("query", ScanQueryHtml(scan));
-  json->Set("requestor", scan.requestor);
+  json->Set("requestor", scan.remote_user.username());
 
   json->Set("duration", HumanReadableElapsedTime::ToShortString(duration.ToSeconds()));
   json->Set("time_since_start",


[2/3] kudu git commit: [rebalancer] location-aware rebalancer (part 9/n)

Posted by al...@apache.org.
[rebalancer] location-aware rebalancer (part 9/n)

Updated reporting functionality of the rebalancer tool to output
information on placement policy violations and other relevant
information for location-aware clusters.

Added one simple integration test as well.

Change-Id: I8407e9f8cf6b41a6aeb075372d852125d9739e08
Reviewed-on: http://gerrit.cloudera.org:8080/11862
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8e9345a7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8e9345a7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8e9345a7

Branch: refs/heads/master
Commit: 8e9345a79849ed3e96a85dc5240e0a4e709b2055
Parents: e172df4
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Oct 26 18:25:24 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Nov 6 21:59:21 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/placement_policy_util-test.cc |  38 +--
 src/kudu/tools/placement_policy_util.cc      |   2 +-
 src/kudu/tools/placement_policy_util.h       |   1 -
 src/kudu/tools/rebalancer.cc                 | 332 +++++++++++++++-------
 src/kudu/tools/rebalancer.h                  |  15 +
 src/kudu/tools/rebalancer_tool-test.cc       | 208 ++++++++++++--
 6 files changed, 456 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/tools/placement_policy_util-test.cc
index 7d48cbe..cccd611 100644
--- a/src/kudu/tools/placement_policy_util-test.cc
+++ b/src/kudu/tools/placement_policy_util-test.cc
@@ -139,19 +139,19 @@ void ClusterConfigToClusterPlacementInfo(const TestClusterConfig& tcc,
   *tpi = std::move(result_tpi);
 }
 
-// TODO(aserbin): is it needed at all?
 bool operator==(const PlacementPolicyViolationInfo& lhs,
                 const PlacementPolicyViolationInfo& rhs) {
   return lhs.tablet_id == rhs.tablet_id &&
       lhs.majority_location == rhs.majority_location &&
       lhs.replicas_num_at_majority_location ==
-          rhs.replicas_num_at_majority_location &&
-      lhs.replication_factor == rhs.replication_factor;
+          rhs.replicas_num_at_majority_location;
 }
 
 ostream& operator<<(ostream& s, const PlacementPolicyViolationInfo& info) {
   s << "{tablet_id: " << info.tablet_id
-    << ", location: " << info.majority_location << "}";
+    << ", location: " << info.majority_location
+    << ", replicas_num_at_majority_location: "
+    << info.replicas_num_at_majority_location << "}";
   return s;
 }
 
@@ -327,7 +327,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
         { "D", {} },
         { "E", {} },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       { { "t0", "C" }, }
     },
 
@@ -345,7 +345,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
         { "B", { "t0", } },
         { "C", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 2 }, },
       {},
     },
 
@@ -364,7 +364,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) {
         { "C", { "t0", } },
         { "D", {} },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 2 }, },
       { { "t0", "B" }, }
     },
   };
@@ -390,7 +390,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
         { "D", { "t1", "x1", } }, { "E", { "t1", } },
         { "F", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
       { { "t0", "B" }, { "t1", "E" } }
     },
 
@@ -410,7 +410,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) {
         { "D", { "t1", "t2", } }, { "E", { "t1", "t3", } },
         { "F", { "t1", "t2", "t3", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 3 }, { "t1", "L1", 2 }, },
       { { "t0", "B" }, { "t1", "E" } }
     },
   };
@@ -441,7 +441,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
         { "E", { "t0", } },
         { "F", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       {},
     },
     // One RF=7 tablet with the distribution of its replica placement violating
@@ -467,7 +467,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
         { "G", { "t0", } },
         { "H", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 4 }, },
       {},
     },
     {
@@ -485,7 +485,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
         { "D", { "t0", } }, { "E", { "t0", } },
         { "F", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       {}
     },
   };
@@ -525,7 +525,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
         { "C", { "t1", } },
         { "D", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L0" }, },
+      { { "t0", "L0", 2 }, { "t1", "L0", 4 }, },
       {}
     },
     {
@@ -541,7 +541,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
         { "A", { "t0", } }, { "B", { "t0", } },
         { "D", { "t1", } }, { "E", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
       {}
     },
     {
@@ -558,7 +558,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
         { "A", { "t0", "t1", } }, { "B", { "t0", "t1", } },
         { "D", { "t1", } }, { "E", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
       {}
     },
     {
@@ -574,7 +574,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
         { "A", { "t0", } }, { "B", { "t0", } },
         { "C", { "t1", } }, { "D", { "t1", } },
       },
-      { { "t0", "L0" }, { "t1", "L1" }, },
+      { { "t0", "L0", 2 }, { "t1", "L1", 2 }, },
       {}
     },
     {
@@ -592,7 +592,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
         { "D", { "t0", } },
         { "F", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 2 }, },
       {}
     },
   };
@@ -616,7 +616,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
         { "D", { "t0", } }, { "F", { "t0", } },
         { "H", { "t0", } },
       },
-      { { "t0", "L0" }, },
+      { { "t0", "L0", 3 }, },
       { { "t0", "B" }, }
     },
     {
@@ -635,7 +635,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
         { "G", { "t0", } },
         { "H", { "t0", } },
       },
-      { { "t0", "L1" }, },
+      { { "t0", "L1", 4 }, },
       { { "t0", "D" }, }
     },
   };

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc
index be1b502..f5ab790 100644
--- a/src/kudu/tools/placement_policy_util.cc
+++ b/src/kudu/tools/placement_policy_util.cc
@@ -333,7 +333,7 @@ Status DetectPlacementPolicyViolations(
           tablet_id, max_replicas_num, rep_factor, max_replicas_location);
     }
     if (is_policy_violated) {
-      info.push_back({ tablet_id, max_replicas_location });
+      info.push_back({ tablet_id, max_replicas_location, max_replicas_num });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/tools/placement_policy_util.h
index e54848d..2938d17 100644
--- a/src/kudu/tools/placement_policy_util.h
+++ b/src/kudu/tools/placement_policy_util.h
@@ -86,7 +86,6 @@ Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
 struct PlacementPolicyViolationInfo {
   std::string tablet_id;
   std::string majority_location;
-  int replication_factor;
   int replicas_num_at_majority_location;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index 4d2d769..46f21f6 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -57,6 +57,7 @@ using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using std::accumulate;
 using std::endl;
+using std::back_inserter;
 using std::inserter;
 using std::ostream;
 using std::map;
@@ -69,6 +70,7 @@ using std::shared_ptr;
 using std::sort;
 using std::string;
 using std::to_string;
+using std::transform;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
@@ -105,7 +107,7 @@ Rebalancer::Rebalancer(const Config& config)
     : config_(config) {
 }
 
-Status Rebalancer::PrintStats(std::ostream& out) {
+Status Rebalancer::PrintStats(ostream& out) {
   // First, report on the current balance state of the cluster.
   RETURN_NOT_OK(RefreshKsckResults());
   const KsckResults& results = ksck_->results();
@@ -116,103 +118,49 @@ Status Rebalancer::PrintStats(std::ostream& out) {
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
 
-  // Per-server replica distribution stats.
-  {
-    out << "Per-server replica distribution summary:" << endl;
-    DataTable summary({"Statistic", "Value"});
-
-    const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
-    if (servers_load_info.empty()) {
-      summary.AddRow({ "N/A", "N/A" });
-    } else {
-      const int64_t total_replica_count = accumulate(
-          servers_load_info.begin(), servers_load_info.end(), 0L,
-          [](int64_t sum, const pair<int32_t, string>& elem) {
-            return sum + elem.first;
-          });
-
-      const auto min_replica_count = servers_load_info.begin()->first;
-      const auto max_replica_count = servers_load_info.rbegin()->first;
-      const double avg_replica_count =
-          1.0 * total_replica_count / servers_load_info.size();
-
-      summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
-      summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
-      summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
-    }
-    RETURN_NOT_OK(summary.PrintTo(out));
-    out << endl;
-
-    if (config_.output_replica_distribution_details) {
-      const auto& tserver_summaries = results.tserver_summaries;
-      unordered_map<string, string> tserver_endpoints;
-      for (const auto& summary : tserver_summaries) {
-        tserver_endpoints.emplace(summary.uuid, summary.address);
-      }
-
-      out << "Per-server replica distribution details:" << endl;
-      DataTable servers_info({ "UUID", "Address", "Replica Count" });
-      for (const auto& elem : servers_load_info) {
-        const auto& id = elem.second;
-        servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
-      }
-      RETURN_NOT_OK(servers_info.PrintTo(out));
-      out << endl;
-    }
+  const auto& ts_id_by_location = ci.locality.servers_by_location;
+  if (ts_id_by_location.empty()) {
+    // Nothing to report about: there are no tablet servers reported.
+    out << "an empty cluster" << endl;
+    return Status::OK();
   }
 
-  // Per-table replica distribution stats.
-  {
-    out << "Per-table replica distribution summary:" << endl;
-    DataTable summary({ "Replica Skew", "Value" });
-    const auto& table_skew_info = ci.balance.table_info_by_skew;
-    if (table_skew_info.empty()) {
-      summary.AddRow({ "N/A", "N/A" });
-    } else {
-      const auto min_table_skew = table_skew_info.begin()->first;
-      const auto max_table_skew = table_skew_info.rbegin()->first;
-      const int64_t sum_table_skew = accumulate(
-          table_skew_info.begin(), table_skew_info.end(), 0L,
-          [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
-            return sum + elem.first;
-          });
-      double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
-
-      summary.AddRow({ "Minimum", to_string(min_table_skew) });
-      summary.AddRow({ "Maximum", to_string(max_table_skew) });
-      summary.AddRow({ "Average", to_string(avg_table_skew) });
-    }
-    RETURN_NOT_OK(summary.PrintTo(out));
-    out << endl;
+  if (ts_id_by_location.size() == 1) {
+    // That's about printing information about the whole cluster.
+    return PrintLocationBalanceStats(ts_id_by_location.begin()->first,
+                                     raw_info, ci, out);
+  }
 
-    if (config_.output_replica_distribution_details) {
-      const auto& table_summaries = results.table_summaries;
-      unordered_map<string, const KsckTableSummary*> table_info;
-      for (const auto& summary : table_summaries) {
-        table_info.emplace(summary.id, &summary);
-      }
-      out << "Per-table replica distribution details:" << endl;
-      DataTable skew(
-          { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
-      for (const auto& elem : table_skew_info) {
-        const auto& table_id = elem.second.table_id;
-        const auto it = table_info.find(table_id);
-        const auto* table_summary =
-            (it == table_info.end()) ? nullptr : it->second;
-        const auto& table_name = table_summary ? table_summary->name : "";
-        const auto total_replica_count = table_summary
-            ? table_summary->replication_factor * table_summary->TotalTablets()
-            : 0;
-        skew.AddRow({ table_id,
-                      to_string(total_replica_count),
-                      to_string(elem.first),
-                      table_name });
-      }
-      RETURN_NOT_OK(skew.PrintTo(out));
-      out << endl;
-    }
+  // The stats are more detailed in the case of a multi-location cluster.
+  DCHECK_GT(ts_id_by_location.size(), 1);
+
+  // 1. Print information about cross-location balance.
+  RETURN_NOT_OK(PrintCrossLocationBalanceStats(ci, out));
+
+  // 2. Iterating over locations in the cluster, print per-location balance
+  //    information. Since the ts_id_by_location is not sorted, let's first
+  //    create a sorted list of locations so the ouput would be sorted by
+  //    location.
+  vector<string> locations;
+  locations.reserve(ts_id_by_location.size());
+  transform(ts_id_by_location.cbegin(), ts_id_by_location.cend(),
+            back_inserter(locations),
+            [](const unordered_map<string, set<string>>::value_type& elem) {
+              return elem.first;
+            });
+  sort(locations.begin(), locations.end());
+
+  for (const auto& location : locations) {
+    ClusterRawInfo raw_info;
+    RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, results, &raw_info));
+    ClusterInfo ci;
+    RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+    RETURN_NOT_OK(PrintLocationBalanceStats(location, raw_info, ci, out));
   }
 
+  // 3. Print information about placement policy violations.
+  RETURN_NOT_OK(PrintPolicyViolationInfo(raw_info, out));
+
   return Status::OK();
 }
 
@@ -538,6 +486,194 @@ Status Rebalancer::FilterCrossLocationTabletCandidates(
   return Status::OK();
 }
 
+Status Rebalancer::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+                                                  ostream& out) const {
+  // Print location load information.
+  map<string, int64_t> replicas_num_by_location;
+  for (const auto& elem : ci.balance.servers_by_total_replica_count) {
+    const auto& location = FindOrDie(ci.locality.location_by_ts_id, elem.second);
+    LookupOrEmplace(&replicas_num_by_location, location, 0) += elem.first;
+  }
+  out << "Locations load summary:" << endl;
+  DataTable location_load_summary({"Location", "Load"});
+  for (const auto& elem : replicas_num_by_location) {
+    const auto& location = elem.first;
+    const auto servers_num =
+        FindOrDie(ci.locality.servers_by_location, location).size();
+    CHECK_GT(servers_num, 0);
+    double location_load = static_cast<double>(elem.second) / servers_num;
+    location_load_summary.AddRow({ location, to_string(location_load) });
+  }
+  RETURN_NOT_OK(location_load_summary.PrintTo(out));
+  out << endl;
+
+  return Status::OK();
+}
+
+Status Rebalancer::PrintLocationBalanceStats(const string& location,
+                                             const ClusterRawInfo& raw_info,
+                                             const ClusterInfo& ci,
+                                             ostream& out) const {
+  if (!location.empty()) {
+    out << "--------------------------------------------------" << endl;
+    out << "Location: " << location << endl;
+    out << "--------------------------------------------------" << endl;
+  }
+
+  // Per-server replica distribution stats.
+  {
+    out << "Per-server replica distribution summary:" << endl;
+    DataTable summary({"Statistic", "Value"});
+
+    const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
+    if (servers_load_info.empty()) {
+      summary.AddRow({ "N/A", "N/A" });
+    } else {
+      const int64_t total_replica_count = accumulate(
+          servers_load_info.begin(), servers_load_info.end(), 0L,
+          [](int64_t sum, const pair<int32_t, string>& elem) {
+            return sum + elem.first;
+          });
+
+      const auto min_replica_count = servers_load_info.begin()->first;
+      const auto max_replica_count = servers_load_info.rbegin()->first;
+      const double avg_replica_count =
+          1.0 * total_replica_count / servers_load_info.size();
+
+      summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) });
+      summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) });
+      summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+    out << endl;
+
+    if (config_.output_replica_distribution_details) {
+      const auto& tserver_summaries = raw_info.tserver_summaries;
+      unordered_map<string, string> tserver_endpoints;
+      for (const auto& summary : tserver_summaries) {
+        tserver_endpoints.emplace(summary.uuid, summary.address);
+      }
+
+      out << "Per-server replica distribution details:" << endl;
+      DataTable servers_info({ "UUID", "Address", "Replica Count" });
+      for (const auto& elem : servers_load_info) {
+        const auto& id = elem.second;
+        servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) });
+      }
+      RETURN_NOT_OK(servers_info.PrintTo(out));
+      out << endl;
+    }
+  }
+
+  // Per-table replica distribution stats.
+  {
+    out << "Per-table replica distribution summary:" << endl;
+    DataTable summary({ "Replica Skew", "Value" });
+    const auto& table_skew_info = ci.balance.table_info_by_skew;
+    if (table_skew_info.empty()) {
+      summary.AddRow({ "N/A", "N/A" });
+    } else {
+      const auto min_table_skew = table_skew_info.begin()->first;
+      const auto max_table_skew = table_skew_info.rbegin()->first;
+      const int64_t sum_table_skew = accumulate(
+          table_skew_info.begin(), table_skew_info.end(), 0L,
+          [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) {
+            return sum + elem.first;
+          });
+      double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size();
+
+      summary.AddRow({ "Minimum", to_string(min_table_skew) });
+      summary.AddRow({ "Maximum", to_string(max_table_skew) });
+      summary.AddRow({ "Average", to_string(avg_table_skew) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+    out << endl;
+
+    if (config_.output_replica_distribution_details) {
+      const auto& table_summaries = raw_info.table_summaries;
+      unordered_map<string, const KsckTableSummary*> table_info;
+      for (const auto& summary : table_summaries) {
+        table_info.emplace(summary.id, &summary);
+      }
+      out << "Per-table replica distribution details:" << endl;
+      DataTable skew(
+          { "Table Id", "Replica Count", "Replica Skew", "Table Name" });
+      for (const auto& elem : table_skew_info) {
+        const auto& table_id = elem.second.table_id;
+        const auto it = table_info.find(table_id);
+        const auto* table_summary =
+            (it == table_info.end()) ? nullptr : it->second;
+        const auto& table_name = table_summary ? table_summary->name : "";
+        const auto total_replica_count = table_summary
+            ? table_summary->replication_factor * table_summary->TotalTablets()
+            : 0;
+        skew.AddRow({ table_id,
+                      to_string(total_replica_count),
+                      to_string(elem.first),
+                      table_name });
+      }
+      RETURN_NOT_OK(skew.PrintTo(out));
+      out << endl;
+    }
+  }
+
+  return Status::OK();
+}
+
+Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+                                            ostream& out) const {
+  TabletsPlacementInfo placement_info;
+  RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+  vector<PlacementPolicyViolationInfo> ppvi;
+  RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+  out << "Placement policy violations:" << endl;
+  if (ppvi.empty()) {
+    out << "  none" << endl << endl;;
+    return Status::OK();
+  }
+
+  if (config_.output_replica_distribution_details) {
+    DataTable stats(
+        { "Location", "Table Name", "Tablet", "RF", "Replicas at location" });
+    for (const auto& info : ppvi) {
+      const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+                                       info.tablet_id);
+      const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+      stats.AddRow({ info.majority_location,
+                     table_info.name,
+                     info.tablet_id,
+                     to_string(table_info.replication_factor),
+                     to_string(info.replicas_num_at_majority_location) });
+    }
+    RETURN_NOT_OK(stats.PrintTo(out));
+  } else {
+    DataTable summary({ "Location",
+                        "Number of non-complying tables",
+                        "Number of non-complying tablets" });
+    typedef pair<unordered_set<string>, unordered_set<string>> TableTabletIds;
+    // Location --> sets of identifiers of tables and tablets hosted by the
+    // tablet servers at the location. The summary is sorted by location.
+    map<string, TableTabletIds> info_by_location;
+    for (const auto& info : ppvi) {
+      const auto& table_id = FindOrDie(placement_info.tablet_to_table_id,
+                                       info.tablet_id);
+      auto& elem = LookupOrEmplace(&info_by_location,
+                                   info.majority_location, TableTabletIds());
+      elem.first.emplace(table_id);
+      elem.second.emplace(info.tablet_id);
+    }
+    for (const auto& elem : info_by_location) {
+      summary.AddRow({ elem.first,
+                       to_string(elem.second.first.size()),
+                       to_string(elem.second.second.size()) });
+    }
+    RETURN_NOT_OK(summary.PrintTo(out));
+  }
+  out << endl;
+
+  return Status::OK();
+}
+
 Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
                                     const MovesInProgress& moves_in_progress,
                                     ClusterInfo* info) const {
@@ -1084,11 +1220,11 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
     return Status::OK();
   }
   unordered_set<string> tablets_in_move;
-  std::transform(scheduled_moves_.begin(), scheduled_moves_.end(),
-                 inserter(tablets_in_move, tablets_in_move.begin()),
-                 [](const MovesInProgress::value_type& elem) {
-                   return elem.first;
-                 });
+  transform(scheduled_moves_.begin(), scheduled_moves_.end(),
+            inserter(tablets_in_move, tablets_in_move.begin()),
+            [](const MovesInProgress::value_type& elem) {
+              return elem.first;
+            });
   for (const auto& move : moves) {
     vector<string> tablet_ids;
     RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 7bb0d73..cbaef49 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -442,6 +442,21 @@ class Rebalancer {
       const TableReplicaMove& move,
       std::vector<std::string>* tablet_ids);
 
+  // Print information on the cross-location balance.
+  Status PrintCrossLocationBalanceStats(const ClusterInfo& ci,
+                                        std::ostream& out) const;
+
+  // Print statistics for the specified location. If 'location' is an empty
+  // string, that's about printing the cluster-wide stats for a cluster that
+  // doesn't have any locations defined.
+  Status PrintLocationBalanceStats(const std::string& location,
+                                   const ClusterRawInfo& raw_info,
+                                   const ClusterInfo& ci,
+                                   std::ostream& out) const;
+
+  Status PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
+                                  std::ostream& out) const;
+
   // Convert the 'raw' information about the cluster into information suitable
   // for the input of the high-level rebalancing algorithm.
   // The 'moves_in_progress' parameter contains information on the replica moves

http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer_tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 60e9f45..9fcd997 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -25,6 +25,7 @@
 #include <string>
 #include <thread>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -88,6 +89,7 @@ using std::thread;
 using std::tuple;
 using std::unique_ptr;
 using std::unordered_map;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -207,28 +209,14 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
   ASSERT_STR_MATCHES(err, err_msg_pattern);
 }
 
-// Create tables with unbalanced replica distribution: useful in
-// rebalancer-related tests.
-static Status CreateUnbalancedTables(
+static Status CreateTables(
     cluster::ExternalMiniCluster* cluster,
     client::KuduClient* client,
     const Schema& table_schema,
     const string& table_name_pattern,
     int num_tables,
     int rep_factor,
-    int tserver_idx_from,
-    int tserver_num,
-    int tserver_unresponsive_ms,
     vector<string>* table_names = nullptr) {
-  // Keep running only some tablet servers and shut down the rest.
-  for (auto i = tserver_idx_from; i < tserver_num; ++i) {
-    cluster->tablet_server(i)->Shutdown();
-  }
-
-  // Wait for the catalog manager to understand that not all tablet servers
-  // are available.
-  SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
-
   // Create tables with their tablet replicas landing only on the tablet servers
   // which are up and running.
   auto client_schema = KuduSchema::FromSchema(table_schema);
@@ -253,6 +241,32 @@ static Status CreateUnbalancedTables(
     }
   }
 
+  return Status::OK();
+}
+
+// Create tables with unbalanced replica distribution: useful in
+// rebalancer-related tests.
+static Status CreateUnbalancedTables(
+    cluster::ExternalMiniCluster* cluster,
+    client::KuduClient* client,
+    const Schema& table_schema,
+    const string& table_name_pattern,
+    int num_tables,
+    int rep_factor,
+    int tserver_idx_from,
+    int tserver_num,
+    int tserver_unresponsive_ms,
+    vector<string>* table_names = nullptr) {
+  // Keep running only some tablet servers and shut down the rest.
+  for (auto i = tserver_idx_from; i < tserver_num; ++i) {
+    cluster->tablet_server(i)->Shutdown();
+  }
+
+  // Wait for the catalog manager to understand that not all tablet servers
+  // are available.
+  SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
+  RETURN_NOT_OK(CreateTables(cluster, client, table_schema, table_name_pattern,
+                             num_tables, rep_factor, table_names));
   for (auto i = tserver_idx_from; i < tserver_num; ++i) {
     RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
   }
@@ -404,9 +418,13 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
   static const char* const kExitOnSignalStr;
   static const char* const kTableNamePattern;
 
+  // Working around limitations of older libstdc++.
+  static const unordered_set<string> kEmptySet;
+
   void Prepare(const vector<string>& extra_tserver_flags = {},
                const vector<string>& extra_master_flags = {},
                const LocationInfo& location_info = {},
+               const unordered_set<string>& empty_locations = kEmptySet,
                vector<string>* created_tables_names = nullptr) {
     const auto& scheme_flag = Substitute(
         "--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
@@ -420,12 +438,60 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
 
     FLAGS_num_tablet_servers = num_tservers_;
     FLAGS_num_replicas = rep_factor_;
-    NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info));
+    NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info,
+                            /*create_table=*/ false));
+
+    if (location_info.empty()) {
+      ASSERT_OK(CreateUnbalancedTables(
+          cluster_.get(), client_.get(), schema_, kTableNamePattern,
+          num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
+          tserver_unresponsive_ms_, created_tables_names));
+    } else {
+      ASSERT_OK(CreateTablesExcludingLocations(empty_locations,
+                                               created_tables_names));
+    }
+  }
+
+  // Create tables placing their tablet replicas everywhere but not at the
+  // tablet servers in the specified locations. This is similar to
+  // CreateUnbalancedTables() but the set of tablet servers to avoid is defined
+  // by the set of the specified locations.
+  Status CreateTablesExcludingLocations(
+      const unordered_set<string>& excluded_locations,
+      vector<string>* table_names = nullptr) {
+    // Shutdown all tablet servers in the specified locations so no tablet
+    // replicas would be hosted by those servers.
+    unordered_set<string> seen_locations;
+    if (!excluded_locations.empty()) {
+      for (const auto& elem : tablet_servers_) {
+        auto* ts = elem.second;
+        if (ContainsKey(excluded_locations, ts->location)) {
+          cluster_->tablet_server_by_uuid(ts->uuid())->Shutdown();
+          EmplaceIfNotPresent(&seen_locations, ts->location);
+        }
+      }
+    }
+    // Sanity check: every specified location should have been seen, otherwise
+    // something is wrong with the tablet servers' registration.
+    CHECK_EQ(excluded_locations.size(), seen_locations.size());
+
+    // Wait for the catalog manager to understand that not all tablet servers
+    // are available.
+    SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms_ / 4));
+    RETURN_NOT_OK(CreateTables(cluster_.get(), client_.get(), schema_,
+                               kTableNamePattern, num_tables_, rep_factor_,
+                               table_names));
+    // Start tablet servers at the excluded locations.
+    if (!excluded_locations.empty()) {
+      for (const auto& elem : tablet_servers_) {
+        auto* ts = elem.second;
+        if (ContainsKey(excluded_locations, ts->location)) {
+          RETURN_NOT_OK(cluster_->tablet_server_by_uuid(ts->uuid())->Restart());
+        }
+      }
+    }
 
-    ASSERT_OK(CreateUnbalancedTables(
-        cluster_.get(), client_.get(), schema_, kTableNamePattern,
-        num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
-        tserver_unresponsive_ms_, created_tables_names));
+    return Status::OK();
   }
 
   // When the rebalancer starts moving replicas, ksck detects corruption
@@ -454,6 +520,7 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
 };
 const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
 const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0";
+const unordered_set<string> RebalancingTest::kEmptySet = unordered_set<string>();
 
 typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest;
 
@@ -1185,7 +1252,7 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
 
   const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, };
   vector<string> table_names;
-  NO_FATALS(Prepare({}, {}, location_info, &table_names));
+  NO_FATALS(Prepare({}, {}, location_info, kEmptySet, &table_names));
 
   const vector<string> tool_args = {
     "cluster",
@@ -1261,5 +1328,104 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
   }
 }
 
+class LocationAwareBalanceInfoTest : public RebalancingTest {
+ public:
+  LocationAwareBalanceInfoTest()
+      : RebalancingTest(/*num_tables=*/ 1,
+                        /*rep_factor=*/ 3,
+                        /*num_tservers=*/ 5) {
+  }
+
+  bool is_343_scheme() const override {
+    // These tests are for the 3-4-3 replica management scheme only.
+    return true;
+  }
+};
+
+// Verify the output of the location-aware rebalancer against a cluster
+// that has multiple locations.
+TEST_F(LocationAwareBalanceInfoTest, ReportOnly) {
+  static const char kReferenceOutput[] =
+    R"***(Locations load summary:
+ Location |   Load
+----------+----------
+ /A       | 3.000000
+ /B       | 3.000000
+ /C       | 0.000000
+
+--------------------------------------------------
+Location: /A
+--------------------------------------------------
+Per-server replica distribution summary:
+       Statistic       |  Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew |  Value
+--------------+----------
+ Minimum      | 0
+ Maximum      | 0
+ Average      | 0.000000
+
+--------------------------------------------------
+Location: /B
+--------------------------------------------------
+Per-server replica distribution summary:
+       Statistic       |  Value
+-----------------------+----------
+ Minimum Replica Count | 3
+ Maximum Replica Count | 3
+ Average Replica Count | 3.000000
+
+Per-table replica distribution summary:
+ Replica Skew |  Value
+--------------+----------
+ Minimum      | 0
+ Maximum      | 0
+ Average      | 0.000000
+
+--------------------------------------------------
+Location: /C
+--------------------------------------------------
+Per-server replica distribution summary:
+       Statistic       |  Value
+-----------------------+----------
+ Minimum Replica Count | 0
+ Maximum Replica Count | 0
+ Average Replica Count | 0.000000
+
+Per-table replica distribution summary:
+ Replica Skew | Value
+--------------+-------
+ N/A          | N/A
+
+Placement policy violations:
+ Location | Number of non-complying tables | Number of non-complying tablets
+----------+--------------------------------+---------------------------------
+ /B       | 1                              | 3
+)***";
+
+  const LocationInfo location_info = { { "/A", 1 }, { "/B", 2 }, { "/C", 2 }, };
+  NO_FATALS(Prepare({}, {}, location_info, { "/C" }));
+
+  string out;
+  string err;
+  Status s = RunKuduTool({
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    "--report_only",
+  }, &out, &err);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+  // The output should match the reference report.
+  ASSERT_STR_CONTAINS(out, kReferenceOutput);
+  // The actual rebalancing should not run.
+  ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:")
+      << ToolRunInfo(s, out, err);
+}
+
 } // namespace tools
 } // namespace kudu


[3/3] kudu git commit: [tools] reference comparison mode for rebalancing algo tests

Posted by al...@apache.org.
[tools] reference comparison mode for rebalancing algo tests

Introduced comparison mode for rebalancing algorithms' tests.

For current rebalancing algorithms, it's natural to re-order contiguous
moves of the same weight. That's because of:
  * Iterating over elements of a hash container keyed by
    the weight of a move.
  * Randomly choosing among multiple options of the same weight.

This patch adds MovesOrderingComparison::IGNORE option into one test
configuration of the RebalanceAlgoUnitTest.LocationBalancingSimpleST
scenario.  That fixes the breakage of the test on Ubuntu 18.

Change-Id: I8363f013b5bf8caa3e3b967c64eccca95c763a91
Reviewed-on: http://gerrit.cloudera.org:8080/11870
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6ce61d6e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6ce61d6e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6ce61d6e

Branch: refs/heads/master
Commit: 6ce61d6e67f3cb54ed034fda9c5dd443970ea454
Parents: 8e9345a
Author: Alexey Serbin <al...@apache.org>
Authored: Fri Nov 2 12:06:29 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Nov 6 22:28:12 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/rebalance_algo-test.cc | 63 ++++++++++++++++++++++++++++--
 1 file changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6ce61d6e/src/kudu/tools/rebalance_algo-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalance_algo-test.cc b/src/kudu/tools/rebalance_algo-test.cc
index 3271339..6a5e5a8 100644
--- a/src/kudu/tools/rebalance_algo-test.cc
+++ b/src/kudu/tools/rebalance_algo-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/tools/rebalance_algo.h"
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <iostream>
@@ -67,6 +68,7 @@ using std::endl;
 using std::ostream;
 using std::ostringstream;
 using std::set;
+using std::sort;
 using std::string;
 using std::unordered_map;
 using std::vector;
@@ -84,6 +86,23 @@ struct TablePerServerReplicas {
   const vector<size_t> num_replicas_by_server;
 };
 
+// Whether the order of the moves in the reference results should be verified
+// against the actual moves.
+enum class MovesOrderingComparison {
+  IGNORE,
+  VERIFY,
+};
+struct ReferenceComparisonOptions {
+  // Constructor to initialize the options by default.
+  // NOLINTNEXTLINE(google-explicit-constructor)
+  ReferenceComparisonOptions(MovesOrderingComparison moves_ordering =
+      MovesOrderingComparison::VERIFY)
+      : moves_ordering(moves_ordering) {
+  }
+
+  const MovesOrderingComparison moves_ordering;
+};
+
 // Structure to describe rebalancing-related state of the cluster expressively
 // enough for the tests.
 struct TestClusterConfig {
@@ -104,6 +123,9 @@ struct TestClusterConfig {
   // The expected replica movements: the reference output of the algorithm
   // to compare with.
   const vector<TableReplicaMove> expected_moves;
+
+  // Options controlling how the reference and the actual results are compared.
+  const ReferenceComparisonOptions ref_comparison_options;
 };
 
 bool operator==(const TableReplicaMove& lhs, const TableReplicaMove& rhs) {
@@ -201,7 +223,40 @@ void VerifyLocationRebalancingMoves(const TestClusterConfig& cfg) {
     LocationBalancingAlgo algo;
     ASSERT_OK(algo.GetNextMoves(ci, 0, &moves));
   }
-  EXPECT_EQ(cfg.expected_moves, moves);
+  switch (cfg.ref_comparison_options.moves_ordering) {
+    case MovesOrderingComparison::IGNORE:
+      {
+        // The case when the order of moves is not important. For the
+        // rebalancing algorithms, it's natural to re-order contiguous moves
+        // of the same weight. This is because of:
+        //   a) randomly choosing among multiple options of the same weight
+        //   b) iterating over elements of a hash container keyed by the weight
+        //      of a move.
+        // Here it's necessary to normalize both the reference and the actual
+        // results before performing element-to-element comparison.
+        vector<TableReplicaMove> ref_moves(cfg.expected_moves);
+        constexpr auto kMovesComparator = [](const TableReplicaMove& lhs,
+                                             const TableReplicaMove& rhs) {
+          if (lhs.table_id != rhs.table_id) {
+            return lhs.table_id < rhs.table_id;
+          }
+          if (lhs.from != rhs.from) {
+            return lhs.from < rhs.from;
+          }
+          return lhs.to < rhs.to;
+        };
+        sort(ref_moves.begin(), ref_moves.end(), kMovesComparator);
+        sort(moves.begin(), moves.end(), kMovesComparator);
+        EXPECT_EQ(ref_moves, moves);
+      }
+      break;
+    case MovesOrderingComparison::VERIFY:
+      EXPECT_EQ(cfg.expected_moves, moves);
+      break;
+    default:
+      FAIL() << "unexpected reference comparison style";
+      break;
+  }
 }
 
 // Is 'cbi' balanced according to the two-dimensional greedy algorithm?
@@ -984,13 +1039,13 @@ TEST(RebalanceAlgoUnitTest, LocationBalancingSimpleST) {
       },
       { "0", "1", "2", },
       { { "A", { 6, 0, 0, } }, },
-      // TODO(aserbin): what about ordering?
       {
-        { "A", "0", "2" },
         { "A", "0", "1" },
+        { "A", "0", "2" },
         { "A", "0", "1" },
         { "A", "0", "2" },
-      }
+      },
+      { MovesOrderingComparison::IGNORE }
     },
     {
       {