You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/02/08 00:58:39 UTC
[kudu] 02/02: [rebalancer] location-aware rebalancer (part 10/n)
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.9.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit cb5c6cd58bda8fc4efc01cca1c520be6b4fb6bf2
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Nov 6 15:11:25 2018 -0800
[rebalancer] location-aware rebalancer (part 10/n)
This patch makes the placement policy fixer and the cross-location
rebalancing phases take into account already scheduled replica
movements (a.k.a. moves is progress).
Also, added more integration tests for the LA rebalancer tool.
Change-Id: I27ada3c3dc50f45d1a95aaa51c36e2d40d8f3d49
Reviewed-on: http://gerrit.cloudera.org:8080/11892
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
(cherry picked from commit 4433fa5c787df13f95c595af15c470f89bd2957d)
Reviewed-on: http://gerrit.cloudera.org:8080/12395
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/tools/placement_policy_util.cc | 102 +++++++++++---
src/kudu/tools/placement_policy_util.h | 12 +-
src/kudu/tools/rebalance_algo.cc | 2 +-
src/kudu/tools/rebalance_algo.h | 4 +-
src/kudu/tools/rebalancer.cc | 71 +++++++---
src/kudu/tools/rebalancer.h | 4 -
src/kudu/tools/rebalancer_tool-test.cc | 228 +++++++++++++++++++++++++++++++-
7 files changed, 367 insertions(+), 56 deletions(-)
diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc
index 48456f7..b92ff3b 100644
--- a/src/kudu/tools/placement_policy_util.cc
+++ b/src/kudu/tools/placement_policy_util.cc
@@ -209,8 +209,10 @@ Status FindBestReplicaToReplace(
} // anonymous namespace
-Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
- TabletsPlacementInfo* info) {
+Status BuildTabletsPlacementInfo(
+ const ClusterRawInfo& raw_info,
+ const Rebalancer::MovesInProgress& moves_in_progress,
+ TabletsPlacementInfo* info) {
DCHECK(info);
unordered_map<string, TableInfo> tables_info;
@@ -236,11 +238,9 @@ Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
decltype(TabletsPlacementInfo::tablet_location_info) tablet_location_info;
for (const auto& tablet_summary : raw_info.tablet_summaries) {
const auto& tablet_id = tablet_summary.id;
- if (tablet_summary.result != KsckCheckResult::HEALTHY) {
- // TODO(aserbin): should this be reported as some transient condition
- // to be taken into account? E.g., a tablet might be
- // in process of copying data to a new replica to replace
- // another replica which violates the placement policy.
+ // TODO(aserbin): process RF=1 tablets as necessary
+ if (tablet_summary.result != KsckCheckResult::HEALTHY &&
+ tablet_summary.result != KsckCheckResult::RECOVERING) {
VLOG(1) << Substitute("tablet $0: not considering replicas for movement "
"since the tablet's status is '$1'",
tablet_id,
@@ -249,31 +249,95 @@ Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
}
EmplaceOrDie(&tablet_to_table_id, tablet_id, tablet_summary.table_id);
+ // Check if it's one of the tablets which are currently being rebalanced.
+ // If so, interpret the move as successfully completed, updating the replica
+ // counts correspondingly.
TabletInfo tablet_info;
- for (const auto& replica_info : tablet_summary.replicas) {
- TabletReplicaInfo info;
- info.ts_uuid = replica_info.ts_uuid;
- if (replica_info.is_leader) {
- info.role = ReplicaRole::LEADER;
- } else {
- info.role = replica_info.is_voter ? ReplicaRole::FOLLOWER_VOTER
- : ReplicaRole::FOLLOWER_NONVOTER;
+ const auto it_pending_moves = moves_in_progress.find(tablet_id);
+ if (it_pending_moves != moves_in_progress.end()) {
+ const auto& move_info = it_pending_moves->second;
+ // Check if the target replica is present in the config.
+ bool is_target_replica_present = false;
+ for (const auto& tr : tablet_summary.replicas) {
+ if (tr.ts_uuid == move_info.ts_uuid_to) {
+ is_target_replica_present = true;
+ break;
+ }
+ }
+ // If the target replica is present, it will be processed in the code
+ // below. Otherwise, it's necessary to pretend as if the target replica
+ // is in the config already: the idea is to count in the absent target
+ // replica as if the movement has successfully completed already.
+ //
+ // NOTE: an empty UUID for the target replica means the source replica
+ // is being kicked out from the config and the system will
+ // automatically add the replacement replica at the most appropriate
+ // location.
+ if (!is_target_replica_present && !move_info.ts_uuid_to.empty()) {
+ ++LookupOrEmplace(&replica_num_by_ts_id, move_info.ts_uuid_to, 0);
+
+ // Populate ClusterLocationInfo::tablet_location_info.
+ auto& count_by_location = LookupOrEmplace(&tablet_location_info,
+ tablet_id,
+ unordered_map<string, int>());
+ const auto& location = FindOrDie(location_by_ts_id, move_info.ts_uuid_to);
+ ++LookupOrEmplace(&count_by_location, location, 0);
+
+ {
+ // Faking an appearance of a new voter replica in the config: that's
+ // to reflect the completion of the scheduled replica movement.
+ TabletReplicaInfo info;
+ info.ts_uuid = move_info.ts_uuid_to;
+ info.role = ReplicaRole::FOLLOWER_VOTER;
+ tablet_info.replicas_info.emplace_back(std::move(info));
+ }
}
+ }
+
+ for (const auto& replica_info : tablet_summary.replicas) {
if (replica_info.is_leader && replica_info.consensus_state) {
const auto& cstate = *replica_info.consensus_state;
if (cstate.opid_index) {
tablet_info.config_idx = *cstate.opid_index;
}
}
- ++LookupOrEmplace(&replica_num_by_ts_id, replica_info.ts_uuid, 0);
+ bool do_count_replica = true;
+ if (it_pending_moves != moves_in_progress.end()) {
+ const auto& move_info = it_pending_moves->second;
+ if (move_info.ts_uuid_from == replica_info.ts_uuid) {
+ DCHECK(!replica_info.ts_uuid.empty());
+ // Do not count the source replica for the scheduled/in-progress
+ // replica movement. The idea is to consider pending replica movements
+ // as if they have already completed successfully.
+ do_count_replica = false;
+ }
+ }
+
+ auto& replica_count =
+ LookupOrEmplace(&replica_num_by_ts_id, replica_info.ts_uuid, 0);
+ if (do_count_replica) {
+ ++replica_count;
+ }
// Populate ClusterLocationInfo::tablet_location_info.
auto& count_by_location = LookupOrEmplace(&tablet_location_info,
tablet_id,
unordered_map<string, int>());
- const auto& location = FindOrDie(location_by_ts_id, info.ts_uuid);
- ++LookupOrEmplace(&count_by_location, location, 0);
- tablet_info.replicas_info.emplace_back(std::move(info));
+ const auto& location = FindOrDie(location_by_ts_id, replica_info.ts_uuid);
+ auto& count = LookupOrEmplace(&count_by_location, location, 0);
+ if (do_count_replica) {
+ ++count;
+
+ TabletReplicaInfo info;
+ info.ts_uuid = replica_info.ts_uuid;
+ if (replica_info.is_leader) {
+ info.role = ReplicaRole::LEADER;
+ } else {
+ info.role = replica_info.is_voter ? ReplicaRole::FOLLOWER_VOTER
+ : ReplicaRole::FOLLOWER_NONVOTER;
+ }
+ tablet_info.replicas_info.emplace_back(std::move(info));
+ }
}
EmplaceOrDie(&tablets_info, tablet_id, std::move(tablet_info));
}
diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/tools/placement_policy_util.h
index 2938d17..7547b8e 100644
--- a/src/kudu/tools/placement_policy_util.h
+++ b/src/kudu/tools/placement_policy_util.h
@@ -76,9 +76,15 @@ struct TabletsPlacementInfo {
std::unordered_map<std::string, int> replica_num_by_ts_id;
};
-// Convert ClusterRawInfo into TabletsPlacementInfo.
-Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info,
- TabletsPlacementInfo* info);
+// Convert ClusterRawInfo into TabletsPlacementInfo. The 'moves_in_progress'
+// parameter contains information on the replica moves which have been scheduled
+// by a caller and still in progress: those are considered as successfully
+// completed and applied to the 'raw_info' when building TabletsPlacementInfo
+// for the specified 'raw_info' input.
+Status BuildTabletsPlacementInfo(
+ const ClusterRawInfo& raw_info,
+ const Rebalancer::MovesInProgress& moves_in_progress,
+ TabletsPlacementInfo* info);
// Information on a violation of the basic placement policy constraint.
// The basic constraint is: for any tablet, no location should contain
diff --git a/src/kudu/tools/rebalance_algo.cc b/src/kudu/tools/rebalance_algo.cc
index 8065f51..eaec6da 100644
--- a/src/kudu/tools/rebalance_algo.cc
+++ b/src/kudu/tools/rebalance_algo.cc
@@ -552,7 +552,7 @@ Status LocationBalancingAlgo::FindBestMove(
// (i.e. number of table replicas / number of tablet servers), but it's
// always beneficial to have less loaded servers in absolute terms.
//
- // If there are multiple candiate tablet servers with the same extremum load,
+ // If there are multiple candidate tablet servers with the same extremum load,
// choose among them randomly.
//
// TODO(aserbin): implement fine-grained logic to select the best move among
diff --git a/src/kudu/tools/rebalance_algo.h b/src/kudu/tools/rebalance_algo.h
index 3b101c8..7ed1822 100644
--- a/src/kudu/tools/rebalance_algo.h
+++ b/src/kudu/tools/rebalance_algo.h
@@ -167,7 +167,7 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
// output into the 'server_uuids' parameter. Whether to consider most or least
// loaded servers is controlled by 'extremum'. An empty 'intersection' on
// return means no intersection was found for the mentioned sets of the
- // extremally loaded servers: in that case optimizing the load by table would
+ // extremely loaded servers: in that case optimizing the load by table would
// not affect the extreme load by server.
//
// None of the output parameters may be NULL.
@@ -220,7 +220,7 @@ class TwoDimensionalGreedyAlgo : public RebalancingAlgo {
// a table T. Assume we have locations L_0, ..., L_n, where
// replica_num(T, L_0), ..., replica_num(T, L_n) are numbers of replicas
// of T's tablets at corresponding locations. We want to make the following
-// ratios to devicate as less as possible:
+// ratios to deviate as less as possible:
//
// replica_num(T, L_0) / ts_num(L_0), ..., replica_num(T, L_n) / ts_num(L_n)
//
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index 934d42e..a1548b4 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -460,6 +460,11 @@ Status Rebalancer::FilterCrossLocationTabletCandidates(
"the source ($2) and the destination ($3) tablet servers",
move.table_id, src_location, move.from, move.to));
}
+ if (dst_location.empty()) {
+ // The destination location is not specified, so no restrictions on the
+ // destination location to check for.
+ return Status::OK();
+ }
vector<string> tablet_ids_filtered;
for (auto& tablet_id : *tablet_ids) {
@@ -476,7 +481,11 @@ Status Rebalancer::FilterCrossLocationTabletCandidates(
const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, tablet_id);
const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
const auto rf = table_info.replication_factor;
- if (location_replica_num + 1 >= consensus::MajoritySize(rf)) {
+ // In case of RF=2*N+1, losing (N + 1) replicas means losing the majority.
+ // In case of RF=2*N, losing at least N replicas means losing the majority.
+ const auto replica_num_threshold = rf % 2 ? consensus::MajoritySize(rf)
+ : rf / 2;
+ if (location_replica_num + 1 >= replica_num_threshold) {
VLOG(1) << Substitute("destination location '$0' for candidate tablet $1 "
"already contains $2 of $3 replicas",
dst_location, tablet_id, location_replica_num, rf);
@@ -628,12 +637,13 @@ Status Rebalancer::PrintLocationBalanceStats(const string& location,
Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info,
ostream& out) const {
TabletsPlacementInfo placement_info;
- RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+ RETURN_NOT_OK(BuildTabletsPlacementInfo(
+ raw_info, MovesInProgress(), &placement_info));
vector<PlacementPolicyViolationInfo> ppvi;
RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
out << "Placement policy violations:" << endl;
if (ppvi.empty()) {
- out << " none" << endl << endl;;
+ out << " none" << endl << endl;
return Status::OK();
}
@@ -741,6 +751,32 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
// If so, interpret the move as successfully completed, updating the
// replica counts correspondingly.
const auto it_pending_moves = moves_in_progress.find(tablet.id);
+ if (it_pending_moves != moves_in_progress.end()) {
+ const auto& move_info = it_pending_moves->second;
+ bool is_target_replica_present = false;
+ // Verify that the target replica is present in the config.
+ for (const auto& tr : tablet.replicas) {
+ if (tr.ts_uuid == move_info.ts_uuid_to) {
+ is_target_replica_present = true;
+ break;
+ }
+ }
+ // If the target replica is present, it will be processed in the code
+ // below. Otherwise, it's necessary to pretend as if the target replica
+ // is in the config already: the idea is to count in the absent target
+ // replica as if the movement has successfully completed already.
+ auto it = tserver_replicas_count.find(move_info.ts_uuid_to);
+ if (!is_target_replica_present && !move_info.ts_uuid_to.empty() &&
+ it != tserver_replicas_count.end()) {
+ it->second++;
+ auto table_ins = table_replicas_info.emplace(
+ tablet.table_id, TableReplicasAtServer());
+ TableReplicasAtServer& replicas_at_server = table_ins.first->second;
+
+ auto replicas_ins = replicas_at_server.emplace(move_info.ts_uuid_to, 0);
+ replicas_ins.first->second++;
+ }
+ }
for (const auto& ri : tablet.replicas) {
// Increment total count of replicas at the tablet server.
@@ -755,22 +791,12 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
continue;
}
bool do_count_replica = true;
- if (it_pending_moves != moves_in_progress.end() &&
- tablet.result == KsckCheckResult::RECOVERING) {
+ if (it_pending_moves != moves_in_progress.end()) {
const auto& move_info = it_pending_moves->second;
- bool is_target_replica_present = false;
- // Verify that the target replica is present in the config.
- for (const auto& tr : tablet.replicas) {
- if (tr.ts_uuid == move_info.ts_uuid_to) {
- is_target_replica_present = true;
- break;
- }
- }
- if (move_info.ts_uuid_from == ri.ts_uuid && is_target_replica_present) {
- // It seems both the source and the destination replicas of the
- // scheduled replica movement operation are still in the config.
- // That's a sign that the move operation hasn't yet completed.
- // As explained above, let's interpret the move as successfully
+ if (move_info.ts_uuid_from == ri.ts_uuid) {
+ DCHECK(!ri.ts_uuid.empty());
+ // The source replica of the scheduled replica movement operation
+ // are still in the config. Interpreting the move as successfully
// completed, so the source replica should not be counted in.
do_count_replica = false;
}
@@ -1203,7 +1229,7 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
TabletsPlacementInfo tpi;
if (!loc) {
- RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &tpi));
+ RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, scheduled_moves_, &tpi));
}
// Build 'tablet_id' --> 'target tablet replication factor' map.
@@ -1490,7 +1516,7 @@ bool Rebalancer::PolicyFixer::ScheduleNextMove(bool* has_errors,
}
CHECK(erased) << Substitute("T $0 P $1: move information not found",
move_info.tablet_uuid, move_info.ts_uuid_from);
- LOG(INFO) << Substitute("tablet $0: $1 -> ? move scheduled",
+ LOG(INFO) << Substitute("tablet $0: '$1' -> '?' move scheduled",
move_info.tablet_uuid, move_info.ts_uuid_from);
// Add information on scheduled move into the scheduled_moves_.
// Only one replica of a tablet can be moved at a time.
@@ -1565,10 +1591,11 @@ Status Rebalancer::PolicyFixer::GetNextMovesImpl(
}
}
ClusterInfo ci;
- RETURN_NOT_OK(rebalancer_->BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+ RETURN_NOT_OK(rebalancer_->BuildClusterInfo(raw_info, scheduled_moves_, &ci));
TabletsPlacementInfo placement_info;
- RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+ RETURN_NOT_OK(
+ BuildTabletsPlacementInfo(raw_info, scheduled_moves_, &placement_info));
vector<PlacementPolicyViolationInfo> ppvi;
RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index d42cda5..9aa3241 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -318,10 +318,6 @@ class Rebalancer {
// tserver UUID (i.e. the key) as the destination of the move operation'.
std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
- // Information on scheduled replica movement operations; keys are
- // tablet UUIDs, values are ReplicaMove structures.
- MovesInProgress scheduled_moves_;
-
// Random device and generator for selecting among multiple choices, when
// appropriate.
std::random_device random_device_;
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 9fcd997..7c89418 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -21,6 +21,8 @@
#include <cstdlib>
#include <iterator>
#include <memory>
+#include <map>
+#include <numeric>
#include <ostream>
#include <string>
#include <thread>
@@ -83,6 +85,7 @@ using std::atomic;
using std::back_inserter;
using std::copy;
using std::endl;
+using std::ostream;
using std::ostringstream;
using std::string;
using std::thread;
@@ -387,7 +390,7 @@ TEST_P(RebalanceParamTest, Rebalance) {
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< "stderr: " << err;
- // The cluster was un-balanced, so many replicas should have been moved.
+ // The cluster was unbalanced, so many replicas should have been moved.
ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
}
@@ -395,6 +398,9 @@ TEST_P(RebalanceParamTest, Rebalance) {
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
}
+// Working around limitations of older libstdc++.
+static const unordered_set<string> kEmptySet = unordered_set<string>();
+
// Common base for the rebalancer-related test below.
class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
public:
@@ -418,9 +424,6 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
static const char* const kExitOnSignalStr;
static const char* const kTableNamePattern;
- // Working around limitations of older libstdc++.
- static const unordered_set<string> kEmptySet;
-
void Prepare(const vector<string>& extra_tserver_flags = {},
const vector<string>& extra_master_flags = {},
const LocationInfo& location_info = {},
@@ -520,7 +523,6 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
};
const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0";
-const unordered_set<string> RebalancingTest::kEmptySet = unordered_set<string>();
typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest;
@@ -1427,5 +1429,221 @@ Placement policy violations:
<< ToolRunInfo(s, out, err);
}
+// Parameters for the location-aware rebalancing tests.
+typedef struct {
+ int replication_factor;
+ int tables_num;
+
+ // Information on the distribution of tablet servers among locations.
+ LocationInfo location_info;
+
+ // Locations where tablet servers are shutdown during tablet creation
+ // to achieve non-balanced tablet replica distribution.
+ unordered_set<string> excluded_locations;
+} LaRebalancingParams;
+
+const LaRebalancingParams kLaRebalancingParams[] = {
+ // RF=3, 1 table, 1 location, 3 tablet servers.
+ { 3, 1,
+ {
+ { { "/A" }, 3 }
+ },
+ kEmptySet },
+
+ // RF=3, 3 tables, 2 locations, 4 (3 + 1) tablet servers.
+ { 3, 3,
+ {
+ { { "/A" }, 3 },
+ { { "/B" }, 1 },
+ },
+ { "/B" }
+ },
+
+ // RF=3, 3 tables, 2 locations, 5 (3 + 2) tablet servers.
+ { 3, 3,
+ {
+ { { "/A" }, 3 },
+ { { "/B" }, 2 },
+ },
+ { "/B" }
+ },
+
+ // RF=3, 3 tables, 2 locations, 6 (3 + 3) tablet servers.
+ { 3, 3,
+ {
+ { { "/A" }, 3 },
+ { { "/B" }, 3 },
+ },
+ { "/B" }
+ },
+
+ // RF=3, 3 tables, 3 locations, 6 (3 + 2 + 1) tablet servers.
+ { 3, 3,
+ {
+ { { "/A" }, 3 },
+ { { "/B" }, 2 },
+ { { "/C" }, 1 },
+ },
+ { "/B" }
+ },
+
+ // RF=5, 3 locations, 4 tables, 8 (3 + 2 + 3) tablet servers.
+ { 5, 4,
+ {
+ { { "/A" }, 3 },
+ { { "/B" }, 2 },
+ { { "/C" }, 3 },
+ }, { "/C" }
+ },
+
+ // RF=7, 3 locations, 7 tables, 10 (3 + 4 + 3) tablet servers.
+ { 7, 7,
+ {
+ { { "/A" }, 3 },
+ { { "/B" }, 4 },
+ { { "/C" }, 3 },
+ }, { "/C" }
+ },
+ // RF=2, 4 locations, 4 (1 + 2 + 1) tablet servers.
+ { 2, 3,
+ {
+ { { "/A" }, 1 },
+ { { "/B" }, 2 },
+ { { "/C" }, 1 },
+ }, { "/B" }
+ },
+
+ // RF=4, 4 locations, 6 (3 + 1 + 1 + 1) tablet servers.
+ { 4, 8,
+ {
+ { { "/A" }, 3 },
+ { { "/B" }, 1 },
+ { { "/C" }, 1 },
+ { { "/D" }, 1 },
+ }, { "/B", "/D" }
+ },
+};
+
+// Custom name generator for LA rebalancing scenarios described by
+// LaRebalancingParams.
+static string LaRebalancingTestName(
+ const testing::TestParamInfo<LaRebalancingParams>& info) {
+ ostringstream str;
+ const auto& p = info.param;
+ str << "idx" << info.index
+ << "_rf" << p.replication_factor
+ << "_t" << p.tables_num
+ << "_l";
+ for (const auto& elem : p.location_info) {
+ str << "_" << elem.second;
+ }
+ return str.str();
+}
+
+// This is used by 'operator<<(ostream&, const LaRebalancingParams&)' below.
+ostream& operator <<(ostream& out, const LocationInfo& info) {
+ out << "{ ";
+ for (const auto& elem : info) {
+ out << elem.first << ":" << elem.second << " ";
+ }
+ out << "}";
+ return out;
+}
+
+// This is useful to print the configuration of a failed param test.
+ostream& operator <<(ostream& out, const LaRebalancingParams& info) {
+ out << "{ rep_factor: " << info.replication_factor;
+ out << ", num_tables: " << info.tables_num
+ << ", location_info: ";
+ out << info.location_info;
+ out << " }";
+ return out;
+}
+
+class LocationAwareRebalancingParamTest :
+ public RebalancingTest,
+ public ::testing::WithParamInterface<LaRebalancingParams> {
+ public:
+ LocationAwareRebalancingParamTest()
+ : RebalancingTest(GetParam().tables_num,
+ GetParam().replication_factor,
+ std::accumulate(GetParam().location_info.begin(),
+ GetParam().location_info.end(), 0,
+ [](int sum, const LocationInfo::value_type& e) {
+ return sum + e.second;
+ })) {
+ }
+
+ bool is_343_scheme() const override {
+ // These tests are for the 3-4-3 replica management scheme only.
+ return true;
+ }
+};
+INSTANTIATE_TEST_CASE_P(, LocationAwareRebalancingParamTest,
+ ::testing::ValuesIn(kLaRebalancingParams),
+ LaRebalancingTestName);
+TEST_P(LocationAwareRebalancingParamTest, Rebalance) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+ const auto& param = GetParam();
+ const auto& location_info = param.location_info;
+ const auto& excluded_locations = param.excluded_locations;
+ const vector<string>& extra_master_flags = {
+ // In this test, the only users of the location assignment test script
+ // are the tablet servers: at this sub-class level it's hard to control
+ // when the test client connects to the external minicluster.
+ "--master_client_location_assignment_enabled=false",
+
+ // This test can exercise scenarios with even replication factor for tables.
+ "--allow_unsafe_replication_factor",
+ };
+ NO_FATALS(Prepare({}, extra_master_flags, location_info, excluded_locations));
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--output_replica_distribution_details=true",
+ };
+
+ // The run of the location-aware rebalancing tool should report the cluster
+ // as balanced.
+ {
+ string out;
+ string err;
+ const Status s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+ << "stderr: " << err;
+ if (!param.excluded_locations.empty()) {
+ // In all location-aware cluster configurations where the replica were
+ // initially placed everywhere but some 'excluded locations', the cluster
+ // was unbalanced, so some replicas should have been moved.
+ ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
+ }
+ if (param.replication_factor == 2 || param.location_info.size() == 2 ||
+ (param.location_info.size() == 3 && param.replication_factor == 4)) {
+ // In case some cases it's impossible to satisfy the placement policy's
+ // constraints.
+ ASSERT_STR_CONTAINS(out, "Placement policy violations:\n");
+ ASSERT_STR_CONTAINS(out,
+ "Number of non-complying tables | Number of non-complying tablets\n")
+ << "stderr: " << err;
+ // The "--output_replica_distribution_details" flag is set: the tool
+ // should output details on the violations of the placement policy.
+ ASSERT_STR_CONTAINS(out, "Placement policy violation details:\n")
+ << "stderr: " << err;
+ } else if (param.location_info.size() > 1) {
+ // In other cases all the violations of the placement policy should be
+ // corrected (in the case of a single location the rebalancer does not
+ // check against placement policy violations because that case is treated
+ // the same as a location-unaware, whole cluster rebalancing).
+ ASSERT_STR_CONTAINS(out, "Placement policy violations:\n none\n");
+ }
+ }
+}
+
} // namespace tools
} // namespace kudu