You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/11/01 03:33:42 UTC
[1/2] kudu git commit: [rebalancer] location-aware rebalancer (part
7/n)
Repository: kudu
Updated Branches:
refs/heads/master fec218bf6 -> 4ec2598a3
[rebalancer] location-aware rebalancer (part 7/n)
Added PolicyFixer and integrated the cross-location balancing
algorithm. Added one basic integration test as well.
More integration tests will be added in a follow-up commit.
Change-Id: I9ace790aad1c1a4605ef90f6df2104f4a228a5b5
Reviewed-on: http://gerrit.cloudera.org:8080/11748
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/81bba247
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/81bba247
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/81bba247
Branch: refs/heads/master
Commit: 81bba2472b469996c3a0d8ed9f6412fe29cd4771
Parents: fec218b
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Oct 19 23:38:16 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Nov 1 03:32:45 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/CMakeLists.txt | 3 +-
src/kudu/tools/rebalancer.cc | 492 ++++++++++++++++++++++++++--
src/kudu/tools/rebalancer.h | 158 +++++++--
src/kudu/tools/rebalancer_tool-test.cc | 128 +++++++-
src/kudu/tools/tool_action_cluster.cc | 27 +-
5 files changed, 740 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index d4017d6..1be02ca 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -185,7 +185,8 @@ ADD_KUDU_TEST(placement_policy_util-test)
ADD_KUDU_TEST(rebalance-test)
ADD_KUDU_TEST(rebalance_algo-test)
ADD_KUDU_TEST(rebalancer_tool-test
- NUM_SHARDS 8 PROCESSORS 3)
+ NUM_SHARDS 8 PROCESSORS 3
+ DATA_FILES ../scripts/assign-location.py)
ADD_KUDU_TEST_DEPENDENCIES(rebalancer_tool-test
kudu)
ADD_KUDU_TEST(tool_action-test)
http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index abf28f0..4d2d769 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -38,6 +38,7 @@
#include <glog/logging.h>
#include "kudu/client/client.h"
+#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -45,6 +46,7 @@
#include "kudu/tools/ksck.h"
#include "kudu/tools/ksck_remote.h"
#include "kudu/tools/ksck_results.h"
+#include "kudu/tools/placement_policy_util.h"
#include "kudu/tools/rebalance_algo.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tools/tool_replica_util.h"
@@ -82,14 +84,20 @@ Rebalancer::Config::Config(
size_t max_staleness_interval_sec,
int64_t max_run_time_sec,
bool move_rf1_replicas,
- bool output_replica_distribution_details)
- : master_addresses(std::move(master_addresses)),
- table_filters(std::move(table_filters)),
- max_moves_per_server(max_moves_per_server),
- max_staleness_interval_sec(max_staleness_interval_sec),
- max_run_time_sec(max_run_time_sec),
- move_rf1_replicas(move_rf1_replicas),
- output_replica_distribution_details(output_replica_distribution_details) {
+ bool output_replica_distribution_details,
+ bool run_policy_fixer,
+ bool run_cross_location_rebalancing,
+ bool run_intra_location_rebalancing)
+ : master_addresses(std::move(master_addresses)),
+ table_filters(std::move(table_filters)),
+ max_moves_per_server(max_moves_per_server),
+ max_staleness_interval_sec(max_staleness_interval_sec),
+ max_run_time_sec(max_run_time_sec),
+ move_rf1_replicas(move_rf1_replicas),
+ output_replica_distribution_details(output_replica_distribution_details),
+ run_policy_fixer(run_policy_fixer),
+ run_cross_location_rebalancing(run_cross_location_rebalancing),
+ run_intra_location_rebalancing(run_intra_location_rebalancing) {
DCHECK_GE(max_moves_per_server, 0);
}
@@ -103,7 +111,7 @@ Status Rebalancer::PrintStats(std::ostream& out) {
const KsckResults& results = ksck_->results();
ClusterRawInfo raw_info;
- RETURN_NOT_OK(KsckResultsToClusterRawInfo(results, &raw_info));
+ RETURN_NOT_OK(KsckResultsToClusterRawInfo(boost::none, results, &raw_info));
ClusterInfo ci;
RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
@@ -218,29 +226,143 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
}
ClusterRawInfo raw_info;
- RETURN_NOT_OK(KsckResultsToClusterRawInfo(ksck_->results(), &raw_info));
+ RETURN_NOT_OK(
+ KsckResultsToClusterRawInfo(boost::none, ksck_->results(), &raw_info));
ClusterInfo ci;
RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
- TwoDimensionalGreedyRunner runner(this, config_.max_moves_per_server, deadline);
- RETURN_NOT_OK(runner.Init(config_.master_addresses));
- RETURN_NOT_OK(RunWith(&runner, result_status));
+ const auto& ts_id_by_location = ci.locality.servers_by_location;
+ if (ts_id_by_location.empty()) {
+ // Empty cluster: no tablet servers reported.
+ if (moves_count != nullptr) {
+ *moves_count = 0;
+ }
+ *result_status = RunStatus::CLUSTER_IS_BALANCED;
+ LOG(INFO) << "no tablet servers are reported: nothing to balance";
+ return Status::OK();
+ }
+
+ size_t moves_count_total = 0;
+ if (ts_id_by_location.size() == 1) {
+ const auto& location = ts_id_by_location.cbegin()->first;
+ LOG(INFO) << "running whole-cluster rebalancing";
+ IntraLocationRunner runner(
+ this, config_.max_moves_per_server, deadline, location);
+ RETURN_NOT_OK(runner.Init(config_.master_addresses));
+ RETURN_NOT_OK(RunWith(&runner, result_status));
+ moves_count_total += runner.moves_count();
+ } else {
+ // The essence of location-aware balancing:
+ // 1. Find tablets whose replicas placed in such a way that their
+ // distribution violates the main constraint of the placement policy.
+ // For each non-conforming tablet, move its replicas to restore
+ // the placement policy restrictions. In other words, if a location has
+ // more than the majority of replicas for some tablet,
+ // move the replicas of the tablet to other locations.
+ // 2. For every tablet whose replica placement does not violate the
+ // placement policy constraints, balance the load among locations.
+ // 3. Balance replica distribution within every location. This is a.k.a.
+ // intra-location balancing. The intra-location balancing involves
+ // moving replicas only within location, no replicas are moved between
+ // locations.
+ if (config_.run_policy_fixer) {
+ // Fix placement policy violations, if any.
+ LOG(INFO) << "fixing placement policy violations";
+ PolicyFixer runner(this, config_.max_moves_per_server, deadline);
+ RETURN_NOT_OK(runner.Init(config_.master_addresses));
+ RETURN_NOT_OK(RunWith(&runner, result_status));
+ moves_count_total += runner.moves_count();
+ }
+ if (config_.run_cross_location_rebalancing) {
+ // Run the rebalancing across locations (inter-location rebalancing).
+ LOG(INFO) << "running cross-location rebalancing";
+ CrossLocationRunner runner(this, config_.max_moves_per_server, deadline);
+ RETURN_NOT_OK(runner.Init(config_.master_addresses));
+ RETURN_NOT_OK(RunWith(&runner, result_status));
+ moves_count_total += runner.moves_count();
+ }
+ if (config_.run_intra_location_rebalancing) {
+ // Run the rebalancing within every location (intra-location rebalancing).
+ for (const auto& elem : ts_id_by_location) {
+ const auto& location = elem.first;
+ // TODO(aserbin): it would be nice to run these rebalancers in parallel
+ LOG(INFO) << "running rebalancer within location '" << location << "'";
+ IntraLocationRunner runner(
+ this, config_.max_moves_per_server, deadline, location);
+ RETURN_NOT_OK(runner.Init(config_.master_addresses));
+ RETURN_NOT_OK(RunWith(&runner, result_status));
+ moves_count_total += runner.moves_count();
+ }
+ }
+ }
if (moves_count != nullptr) {
- *moves_count = runner.moves_count();
+ *moves_count = moves_count_total;
}
return Status::OK();
}
Status Rebalancer::KsckResultsToClusterRawInfo(
+ const boost::optional<string>& location,
const KsckResults& ksck_info,
ClusterRawInfo* raw_info) {
DCHECK(raw_info);
- raw_info->tserver_summaries = ksck_info.tserver_summaries;
- raw_info->table_summaries = ksck_info.table_summaries;
- raw_info->tablet_summaries = ksck_info.tablet_summaries;
+ // Filter out entities that are not relevant to the specified location.
+ vector<KsckServerHealthSummary> tserver_summaries;
+ tserver_summaries.reserve(ksck_info.tserver_summaries.size());
+
+ vector<KsckTabletSummary> tablet_summaries;
+ tablet_summaries.reserve(ksck_info.tablet_summaries.size());
+
+ vector<KsckTableSummary> table_summaries;
+ table_summaries.reserve(table_summaries.size());
+
+ if (!location) {
+ // Information on the whole cluster.
+ tserver_summaries = ksck_info.tserver_summaries;
+ tablet_summaries = ksck_info.tablet_summaries;
+ table_summaries = ksck_info.table_summaries;
+ } else {
+ // Information on the specified location only: filter out non-relevant info.
+ const auto& location_str = *location;
+
+ unordered_set<string> ts_ids_at_location;
+ for (const auto& summary : ksck_info.tserver_summaries) {
+ if (summary.ts_location == location_str) {
+ tserver_summaries.push_back(summary);
+ InsertOrDie(&ts_ids_at_location, summary.uuid);
+ }
+ }
+
+ unordered_set<string> table_ids_at_location;
+ for (const auto& summary : ksck_info.tablet_summaries) {
+ const auto& replicas = summary.replicas;
+ decltype(summary.replicas) replicas_at_location;
+ replicas_at_location.reserve(replicas.size());
+ for (const auto& replica : replicas) {
+ if (ContainsKey(ts_ids_at_location, replica.ts_uuid)) {
+ replicas_at_location.push_back(replica);
+ }
+ }
+ if (!replicas_at_location.empty()) {
+ table_ids_at_location.insert(summary.table_id);
+ }
+ tablet_summaries.push_back(summary);
+ tablet_summaries.back().replicas = std::move(replicas_at_location);
+ }
+
+ for (const auto& summary : ksck_info.table_summaries) {
+ if (ContainsKey(table_ids_at_location, summary.id)) {
+ table_summaries.push_back(summary);
+ }
+ }
+ }
+
+ raw_info->tserver_summaries = std::move(tserver_summaries);
+ raw_info->table_summaries = std::move(table_summaries);
+ raw_info->tablet_summaries = std::move(tablet_summaries);
return Status::OK();
}
@@ -362,6 +484,60 @@ void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
*replica_moves = std::move(filtered_replica_moves);
}
+Status Rebalancer::FilterCrossLocationTabletCandidates(
+ const unordered_map<string, string>& location_by_ts_id,
+ const TabletsPlacementInfo& placement_info,
+ const TableReplicaMove& move,
+ vector<string>* tablet_ids) {
+ DCHECK(tablet_ids);
+
+ if (tablet_ids->empty()) {
+ // Nothing to filter.
+ return Status::OK();
+ }
+
+ const auto& dst_location = FindOrDie(location_by_ts_id, move.to);
+ const auto& src_location = FindOrDie(location_by_ts_id, move.from);
+
+ // Sanity check: the source and the destination tablet servers should be
+ // in different locations.
+ if (src_location == dst_location) {
+ return Status::InvalidArgument(Substitute(
+ "moving replicas of table $0: the same location '$1' for both "
+ "the source ($2) and the destination ($3) tablet servers",
+ move.table_id, src_location, move.from, move.to));
+ }
+
+ vector<string> tablet_ids_filtered;
+ for (auto& tablet_id : *tablet_ids) {
+ const auto& replica_count_info = FindOrDie(
+ placement_info.tablet_location_info, tablet_id);
+ const auto* count_ptr = FindOrNull(replica_count_info, dst_location);
+ if (count_ptr == nullptr) {
+ // Nothing else to clarify: not a single replica in the destnation
+ // location for this candidate tablet.
+ tablet_ids_filtered.emplace_back(std::move(tablet_id));
+ continue;
+ }
+ const auto location_replica_num = *count_ptr;
+ const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, tablet_id);
+ const auto& table_info = FindOrDie(placement_info.tables_info, table_id);
+ const auto rf = table_info.replication_factor;
+ if (location_replica_num + 1 >= consensus::MajoritySize(rf)) {
+ VLOG(1) << Substitute("destination location '$0' for candidate tablet $1 "
+ "already contains $2 of $3 replicas",
+ dst_location, tablet_id, location_replica_num, rf);
+ continue;
+ }
+ // No majority of replicas in the destination location: it's OK candidate.
+ tablet_ids_filtered.emplace_back(std::move(tablet_id));
+ }
+
+ *tablet_ids = std::move(tablet_ids_filtered);
+
+ return Status::OK();
+}
+
Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
const MovesInProgress& moves_in_progress,
ClusterInfo* info) const {
@@ -386,6 +562,18 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
}
}
+ auto& ts_uuids_by_location = result_info.locality.servers_by_location;
+ auto& location_by_ts_uuid = result_info.locality.location_by_ts_id;
+ for (const auto& summary : raw_info.tserver_summaries) {
+ const auto& ts_id = summary.uuid;
+ const auto& ts_location = summary.ts_location;
+ VLOG(1) << Substitute("found tserver $0 at location '$1'", ts_id, ts_location);
+ EmplaceOrDie(&location_by_ts_uuid, ts_id, ts_location);
+ auto& ts_ids = LookupOrEmplace(&ts_uuids_by_location,
+ ts_location, set<string>());
+ InsertOrDie(&ts_ids, ts_id);
+ }
+
for (const auto& s : raw_info.tserver_summaries) {
if (s.health != KsckServerHealth::HEALTHY) {
LOG(INFO) << Substitute("skipping tablet server $0 ($1) because of its "
@@ -495,6 +683,7 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
}
table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
}
+ // TODO(aserbin): add sanity checks on the result.
*info = std::move(result_info);
return Status::OK();
@@ -577,9 +766,10 @@ Status Rebalancer::RunWith(Runner* runner, RunStatus* result_status) {
return Status::OK();
}
-Status Rebalancer::GetClusterRawInfo(ClusterRawInfo* raw_info) {
+Status Rebalancer::GetClusterRawInfo(const boost::optional<string>& location,
+ ClusterRawInfo* raw_info) {
RETURN_NOT_OK(RefreshKsckResults());
- return KsckResultsToClusterRawInfo(ksck_->results(), raw_info);
+ return KsckResultsToClusterRawInfo(location, ksck_->results(), raw_info);
}
Status Rebalancer::RefreshKsckResults() {
@@ -623,17 +813,21 @@ Status Rebalancer::BaseRunner::GetNextMoves(bool* has_moves) {
return Status::OK();
}
- // Filter out moves for tablets which already have operations in progress.
- // The idea is simple: no more than one move operation per tablet should
- // ever be attempted.
+ // The GetNextMovesImpl() method prescribes replica movements using simplified
+ // logic that doesn't know about best practices of safe and robust Raft
+ // configuration changes. Here it's necessary to filter out moves for tablets
+ // which already have operations in progress. The idea is simple: don't start
+ // another operation for a tablet when there is still a pending operation
+ // for that tablet.
Rebalancer::FilterMoves(scheduled_moves_, &replica_moves);
LoadMoves(std::move(replica_moves));
- // TODO(aserbin): now this method reports on presence of some moves even if
- // all of those are in progress and no fresh new are available.
- // Would it be more convenient for to report only on the
- // fresh new moves and check for the presence of the scheduled
- // moves at the upper level?
+ // TODO(aserbin): this method reports on availability of move operations
+ // via the 'has_moves' parameter even if all of those were
+ // actually filtered out by the FilterMoves() method.
+ // Would it be more convenient to report only on the new,
+ // not-yet-in-progress operations and check for the presence
+ // of the scheduled moves at the upper level?
*has_moves = true;
return Status::OK();
}
@@ -733,7 +927,6 @@ void Rebalancer::AlgoBasedRunner::LoadMoves(vector<ReplicaMove> replica_moves) {
}
}
-// Return true if replica move operation has been scheduled successfully.
bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
bool* timed_out) {
DCHECK(has_errors);
@@ -746,13 +939,13 @@ bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
return false;
}
- // Only one move operation per step: it's necessary to update information
- // in the ts_per_op_count_ right after scheduling a single operation
+ // Scheduling one operation per step. Once operation is scheduled, it's
+ // necessary to update the ts_per_op_count_ container right after scheduling
// to avoid oversubscribing of the tablet servers.
size_t op_idx;
if (!FindNextMove(&op_idx)) {
- // Nothing to schedule: unfruitful outcome. Need to wait until
- // there is a slot at tablet server is available.
+ // Nothing to schedule yet: unfruitful outcome. Need to wait until there is
+ // an available slot at a tablet server.
return false;
}
@@ -853,8 +1046,9 @@ bool Rebalancer::AlgoBasedRunner::UpdateMovesInProgressStatus(
// one step of the rebalancing.
Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
vector<ReplicaMove>* replica_moves) {
+ const auto& loc = location();
ClusterRawInfo raw_info;
- RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(&raw_info));
+ RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(loc, &raw_info));
// For simplicity, allow to run the rebalancing only when all tablet servers
// are in good shape. Otherwise, the rebalancing might interfere with the
@@ -867,6 +1061,11 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
}
}
+ TabletsPlacementInfo tpi;
+ if (!loc) {
+ RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &tpi));
+ }
+
// The number of operations to output by the algorithm. Those will be
// translated into concrete tablet replica movement operations, the output of
// this method.
@@ -893,6 +1092,14 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
for (const auto& move : moves) {
vector<string> tablet_ids;
RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));
+ if (!loc) {
+ // In case of cross-location (a.k.a. inter-location) rebalancing it is
+ // necessary to make sure the majority of replicas would not end up
+ // at the same location after the move. If so, remove those tablets
+ // from the list of candidates.
+ RETURN_NOT_OK(FilterCrossLocationTabletCandidates(
+ cluster_info.locality.location_by_ts_id, tpi, move, &tablet_ids));
+ }
// Shuffle the set of the tablet identifiers: that's to achieve even spread
// of moves across tables with the same skew.
std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
@@ -978,7 +1185,7 @@ void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduled(
bool is_success) {
if (is_success) {
Rebalancer::ReplicaMove move_info = { tablet_uuid, src_ts_uuid, dst_ts_uuid };
- auto ins = scheduled_moves_.emplace(tablet_uuid, std::move(move_info));
+ scheduled_moves_.emplace(tablet_uuid, std::move(move_info));
// Only one replica of a tablet can be moved at a time.
// TODO(aserbin): clarify on duplicates
//DCHECK(ins.second);
@@ -1015,12 +1222,227 @@ void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
}
}
-Rebalancer::TwoDimensionalGreedyRunner::TwoDimensionalGreedyRunner(
+Rebalancer::IntraLocationRunner::IntraLocationRunner(
+ Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline,
+ std::string location)
+ : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)),
+ location_(std::move(location)) {
+}
+
+Rebalancer::CrossLocationRunner::CrossLocationRunner(
Rebalancer* rebalancer,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline)
: AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)) {
}
+Rebalancer::PolicyFixer::PolicyFixer(
+ Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline)
+ : BaseRunner(rebalancer, max_moves_per_server, std::move(deadline)) {
+}
+
+Status Rebalancer::PolicyFixer::Init(vector<string> master_addresses) {
+ DCHECK(moves_to_schedule_.empty());
+ return BaseRunner::Init(std::move(master_addresses));
+}
+
+void Rebalancer::PolicyFixer::LoadMoves(
+ vector<ReplicaMove> replica_moves) {
+ // Replace the list of moves operations to schedule. Even if it's not empty,
+ // some elements of it might be irrelevant anyway, so there is no need to
+ // keep any since the new information is the most up-to-date. The input list
+ // is already filtered and should not contain any operations which are
+ // tracked as already scheduled ones.
+ moves_to_schedule_.clear();
+
+ for (auto& move_info : replica_moves) {
+ auto ts_uuid = move_info.ts_uuid_from;
+ DCHECK(!ts_uuid.empty());
+ moves_to_schedule_.emplace(std::move(ts_uuid), std::move(move_info));
+ }
+
+ // Refresh the helper containers.
+ for (const auto& elem : moves_to_schedule_) {
+ const auto& ts_uuid = elem.first;
+ DCHECK(!ts_uuid.empty());
+ if (op_count_per_ts_.emplace(ts_uuid, 0).second) {
+ // No operations for tablet server ts_uuid: add ts_per_op_count_ entry.
+ ts_per_op_count_.emplace(0, ts_uuid);
+ }
+ }
+}
+
+bool Rebalancer::PolicyFixer::ScheduleNextMove(bool* has_errors,
+ bool* timed_out) {
+ DCHECK(has_errors);
+ DCHECK(timed_out);
+ *has_errors = false;
+ *timed_out = false;
+
+ if (deadline_ && MonoTime::Now() >= *deadline_) {
+ *timed_out = true;
+ return false;
+ }
+
+ ReplicaMove move_info;
+ if (!FindNextMove(&move_info)) {
+ return false;
+ }
+
+ // Find a move that's doesn't have its tserver UUID in scheduled_moves_.
+ const auto s = SetReplace(client_,
+ move_info.tablet_uuid,
+ move_info.ts_uuid_from,
+ move_info.config_opid_idx);
+ if (!s.ok()) {
+ *has_errors = true;
+ return false;
+ }
+
+ // Remove the element from moves_to_schedule_.
+ bool erased = false;
+ auto range = moves_to_schedule_.equal_range(move_info.ts_uuid_from);
+ for (auto it = range.first; it != range.second; ++it) {
+ if (move_info.tablet_uuid == it->second.tablet_uuid) {
+ moves_to_schedule_.erase(it);
+ erased = true;
+ break;
+ }
+ }
+ CHECK(erased) << Substitute("T $0 P $1: move information not found",
+ move_info.tablet_uuid, move_info.ts_uuid_from);
+ LOG(INFO) << Substitute("tablet $0: $1 -> ? move scheduled",
+ move_info.tablet_uuid, move_info.ts_uuid_from);
+ // Add information on scheduled move into the scheduled_moves_.
+ // Only one replica of a tablet can be moved at a time.
+ auto tablet_uuid = move_info.tablet_uuid;
+ EmplaceOrDie(&scheduled_moves_, std::move(tablet_uuid), std::move(move_info));
+ return true;
+}
+
+bool Rebalancer::PolicyFixer::UpdateMovesInProgressStatus(
+ bool* has_errors, bool* timed_out) {
+ DCHECK(has_errors);
+ DCHECK(timed_out);
+
+ auto has_updates = false;
+ auto error_count = 0;
+ auto out_of_time = false;
+ for (auto it = scheduled_moves_.begin(); it != scheduled_moves_.end(); ) {
+ if (deadline_ && MonoTime::Now() >= *deadline_) {
+ out_of_time = true;
+ break;
+ }
+ bool is_complete;
+ Status completion_status;
+ const auto& tablet_id = it->second.tablet_uuid;
+ const auto& ts_uuid = it->second.ts_uuid_from;
+ auto s = CheckCompleteReplace(client_, tablet_id, ts_uuid,
+ &is_complete, &completion_status);
+ if (!s.ok()) {
+ // Update on the movement status has failed: remove the move operation
+ // as if it didn't exist. Once the cluster status is re-synchronized,
+ // the corresponding operation will be scheduled again, if needed.
+ ++error_count;
+ LOG(INFO) << Substitute("tablet $0: $1 -> ? move is abandoned: $2",
+ tablet_id, ts_uuid, s.ToString());
+ it = scheduled_moves_.erase(it);
+ continue;
+ }
+ DCHECK(s.ok());
+ if (is_complete) {
+ // The replacement has completed (success or failure): update the stats
+ // on the pending operations per server.
+ ++moves_count_;
+ has_updates = true;
+ LOG(INFO) << Substitute("tablet $0: $1 -> ? move completed: $2",
+ tablet_id, ts_uuid, completion_status.ToString());
+ UpdateOnMoveCompleted(ts_uuid);
+ it = scheduled_moves_.erase(it);
+ continue;
+ }
+ ++it;
+ }
+
+ *timed_out = out_of_time;
+ *has_errors = (error_count > 0);
+ return has_updates;
+}
+
+Status Rebalancer::PolicyFixer::GetNextMovesImpl(
+ vector<ReplicaMove>* replica_moves) {
+ ClusterRawInfo raw_info;
+ RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(boost::none, &raw_info));
+
+ // For simplicity, allow to run the rebalancing only when all tablet servers
+ // are in good shape. Otherwise, the rebalancing might interfere with the
+ // automatic re-replication or get unexpected errors while moving replicas.
+ // TODO(aserbin): move it somewhere else?
+ for (const auto& s : raw_info.tserver_summaries) {
+ if (s.health != KsckServerHealth::HEALTHY) {
+ return Status::IllegalState(
+ Substitute("tablet server $0 ($1): unacceptable health status $2",
+ s.uuid, s.address, ServerHealthToString(s.health)));
+ }
+ }
+ ClusterInfo ci;
+ RETURN_NOT_OK(rebalancer_->BuildClusterInfo(raw_info, MovesInProgress(), &ci));
+
+ TabletsPlacementInfo placement_info;
+ RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info));
+
+ vector<PlacementPolicyViolationInfo> ppvi;
+ RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+
+ // Filter out all reported violations which are already taken care of.
+ // The idea is to have not more than one pending operation per tablet.
+ {
+ decltype(ppvi) ppvi_filtered;
+ for (auto& info : ppvi) {
+ if (ContainsKey(scheduled_moves_, info.tablet_id)) {
+ continue;
+ }
+ ppvi_filtered.emplace_back(std::move(info));
+ }
+ ppvi = std::move(ppvi_filtered);
+ }
+
+ RETURN_NOT_OK(FindMovesToReimposePlacementPolicy(
+ placement_info, ci.locality, ppvi, replica_moves));
+
+ if (PREDICT_FALSE(VLOG_IS_ON(1))) {
+ for (const auto& info : ppvi) {
+ VLOG(1) << Substitute("policy violation at location '$0': tablet $1",
+ info.majority_location, info.tablet_id);
+ }
+ for (const auto& move : *replica_moves) {
+ VLOG(1) << Substitute("policy fix for tablet $0: replica to remove $1",
+ move.tablet_uuid, move.ts_uuid_from);
+ }
+ }
+
+ return Status::OK();
+}
+
+bool Rebalancer::PolicyFixer::FindNextMove(ReplicaMove* move) {
+ DCHECK(move);
+ // TODO(aserbin): use pessimistic /2 limit for max_moves_per_servers_
+ // since the desitnation servers for the move of the replica marked with
+ // the REPLACE attribute is not known.
+
+ // Load the least loaded (in terms of scheduled moves) tablet servers first.
+ for (const auto& elem : ts_per_op_count_) {
+ const auto& ts_uuid = elem.second;
+ if (FindCopy(moves_to_schedule_, ts_uuid, move)) {
+ return true;
+ }
+ }
+ return false;
+}
+
} // namespace tools
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index c4f5824..7bb0d73 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -36,6 +36,12 @@
#include "kudu/util/status.h"
namespace kudu {
+namespace tools {
+struct TabletsPlacementInfo;
+} // namespace tools
+} // namespace kudu
+
+namespace kudu {
namespace client {
class KuduClient;
@@ -63,7 +69,10 @@ class Rebalancer {
size_t max_staleness_interval_sec = 300,
int64_t max_run_time_sec = 0,
bool move_rf1_replicas = false,
- bool output_replica_distribution_details = false);
+ bool output_replica_distribution_details = false,
+ bool run_policy_fixer = true,
+ bool run_cross_location_rebalancing = true,
+ bool run_intra_location_rebalancing = true);
// Kudu masters' RPC endpoints.
std::vector<std::string> master_addresses;
@@ -94,6 +103,23 @@ class Rebalancer {
// Whether Rebalancer::PrintStats() should output per-table and per-server
// replica distribution details.
bool output_replica_distribution_details;
+
+ // In case of multi-location cluster, whether to detect and fix placement
+ // policy violations. Fixing placement policy violations involves moving
+ // tablet replicas across different locations in the cluster.
+ // This setting is applicable to multi-location clusters only.
+ bool run_policy_fixer = true;
+
+ // In case of multi-location cluster, whether to move tablet replicas
+ // between locations in attempt to spread tablet replicas among location
+ // evenly (equalizing loads of locations throughout the cluster).
+ // This setting is applicable to multi-location clusters only.
+ bool run_cross_location_rebalancing = true;
+
+ // In case of multi-location cluster, whether to rebalance tablet replica
+ // distribution within each location.
+ // This setting is applicable to multi-location clusters only.
+ bool run_intra_location_rebalancing = true;
};
// Represents a concrete move of a replica from one tablet server to another.
@@ -137,14 +163,16 @@ class Rebalancer {
// the 'master_addresses' RPC endpoints.
virtual Status Init(std::vector<std::string> master_addresses) = 0;
- // Load information on prescribed replica movement operations. Also,
+ // Load information on the prescribed replica movement operations. Also,
// populate helper containers and other auxiliary run-time structures
// used by ScheduleNextMove(). This method is called with every batch
// of move operations output by the rebalancing algorithm once previously
// loaded moves have been scheduled.
virtual void LoadMoves(std::vector<ReplicaMove> replica_moves) = 0;
- // Schedule next replica move.
+ // Schedule next replica move. Returns 'true' if replica move operation
+ // has been scheduled successfully; otherwise returns 'false' and sets
+ // the 'has_errors' and 'timed_out' parameters accordingly.
virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0;
// Update statuses and auxiliary information on in-progress replica move
@@ -229,7 +257,8 @@ class Rebalancer {
// The 'max_moves_per_server' specifies the maximum number of operations
// per tablet server (both the source and the destination are counted in).
// The 'deadline' specifies the deadline for the run, 'boost::none'
- // if no timeout is set.
+ // if no timeout is set. If 'location' is boost::none, rebalance across
+ // locations.
AlgoBasedRunner(Rebalancer* rebalancer,
size_t max_moves_per_server,
boost::optional<MonoTime> deadline);
@@ -242,14 +271,19 @@ class Rebalancer {
bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) override;
+ // Get the cluter location the runner is slated to run/running at.
+ // 'boost::none' means all the cluster.
+ virtual const boost::optional<std::string>& location() const = 0;
+
// Rebalancing algorithm that running uses to find replica moves.
virtual RebalancingAlgo* algorithm() = 0;
protected:
Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
- // Given the data in the helper containers, find the index describing
- // the next replica move and output it into the 'op_idx' parameter.
+ // Using the helper containers src_op_indices_ and dst_op_indices_,
+ // find the index of the most optimal replica movement operation
+ // and output the index into the 'op_idx' parameter.
bool FindNextMove(size_t* op_idx);
// Update the helper containers once a move operation has been scheduled.
@@ -277,38 +311,107 @@ class Rebalancer {
// tserver UUID (i.e. the key) as the destination of the move operation'.
std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
+ // Information on scheduled replica movement operations; keys are
+ // tablet UUIDs, values are ReplicaMove structures.
+ MovesInProgress scheduled_moves_;
+
// Random device and generator for selecting among multiple choices, when
// appropriate.
std::random_device random_device_;
std::mt19937 random_generator_;
}; // class AlgoBasedRunner
- class TwoDimensionalGreedyRunner : public AlgoBasedRunner {
+ class IntraLocationRunner : public AlgoBasedRunner {
public:
// The 'max_moves_per_server' specifies the maximum number of operations
// per tablet server (both the source and the destination are counted in).
// The 'deadline' specifies the deadline for the run, 'boost::none'
- // if no timeout is set.
- TwoDimensionalGreedyRunner(Rebalancer* rebalancer,
- size_t max_moves_per_server,
- boost::optional<MonoTime> deadline);
+ // if no timeout is set. In case of non-location aware cluster or if there
+ // is just one location defined in the whole cluster, the whole cluster will
+ // be rebalanced.
+ IntraLocationRunner(Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline,
+ std::string location);
RebalancingAlgo* algorithm() override {
return &algorithm_;
}
+ const boost::optional<std::string>& location() const override {
+ return location_;
+ }
+
private:
+ const boost::optional<std::string> location_;
+
// An instance of the balancing algorithm.
TwoDimensionalGreedyAlgo algorithm_;
};
+ class CrossLocationRunner : public AlgoBasedRunner {
+ public:
+ // The 'max_moves_per_server' specifies the maximum number of operations
+ // per tablet server (both the source and the destination are counted in).
+ // The 'deadline' specifies the deadline for the run, 'boost::none'
+ // if no timeout is set.
+ CrossLocationRunner(Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline);
+
+ RebalancingAlgo* algorithm() override {
+ return &algorithm_;
+ }
+
+ const boost::optional<std::string>& location() const override {
+ return location_;
+ }
+
+ private:
+ const boost::optional<std::string> location_ = boost::none;
+
+ // An instance of the balancing algorithm.
+ LocationBalancingAlgo algorithm_;
+ };
+
+ class PolicyFixer : public BaseRunner {
+ public:
+ PolicyFixer(Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline);
+
+ Status Init(std::vector<std::string> master_addresses) override;
+
+ void LoadMoves(std::vector<ReplicaMove> replica_moves) override;
+
+ bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
+
+ bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) override;
+
+ private:
+ // Key is tserver UUID which corresponds to value.ts_uuid_from.
+ typedef std::unordered_multimap<std::string, ReplicaMove> MovesToSchedule;
+
+ Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
+
+ bool FindNextMove(ReplicaMove* move);
+
+ // An instance of the balancing algorithm.
+ LocationBalancingAlgo algorithm_;
+
+ // Moves yet to schedule.
+ MovesToSchedule moves_to_schedule_;
+ };
+
friend class KsckResultsToClusterBalanceInfoTest;
- // Convert ksck results into information relevant to rebalancing the cluster.
- // Basically, 'raw' information is just a sub-set of relevant fields of the
- // KsckResults structure filtered to contain information only for the
- // specified location.
+ // Convert ksck results into information relevant to rebalancing the cluster
+ // at the location specified by 'location' parameter ('boost::none' for
+ // 'location' means that's about cross-location rebalancing). Basically,
+ // 'raw' information is just a sub-set of relevant fields of the KsckResults
+ // structure filtered to contain information only for the specified location.
static Status KsckResultsToClusterRawInfo(
+ const boost::optional<std::string>& location,
const KsckResults& ksck_info,
ClusterRawInfo* raw_info);
@@ -330,12 +433,21 @@ class Rebalancer {
static void FilterMoves(const MovesInProgress& scheduled_moves,
std::vector<ReplicaMove>* replica_moves);
+ // Filter the list of candidate tablets to make sure the location
+ // of the destination server would not contain the majority of replicas
+ // after the move. The 'tablet_ids' is an in-out parameter.
+ static Status FilterCrossLocationTabletCandidates(
+ const std::unordered_map<std::string, std::string>& location_by_ts_id,
+ const TabletsPlacementInfo& placement_info,
+ const TableReplicaMove& move,
+ std::vector<std::string>* tablet_ids);
+
// Convert the 'raw' information about the cluster into information suitable
// for the input of the high-level rebalancing algorithm.
// The 'moves_in_progress' parameter contains information on the replica moves
// which have been scheduled by a caller and still in progress: those are
// considered as successfully completed and applied to the 'raw_info' when
- // building ClusterInfo for the specified 'raw_info' input. The idea
+ // building ClusterBalanceInfo for the specified 'raw_info' input. The idea
// is to prevent the algorithm outputting the same moves again while some
// of the moves recommended at prior steps are still in progress.
// The result cluster balance information is output into the 'info' parameter.
@@ -347,8 +459,10 @@ class Rebalancer {
// Run rebalancing using the specified runner.
Status RunWith(Runner* runner, RunStatus* result_status);
- // Refresh the information on the cluster (involves running ksck).
- Status GetClusterRawInfo(ClusterRawInfo* raw_info);
+ // Refresh the information on the cluster for the specified location
+ // (involves running ksck).
+ Status GetClusterRawInfo(const boost::optional<std::string>& location,
+ ClusterRawInfo* raw_info);
Status GetNextMoves(Runner* runner,
std::vector<ReplicaMove>* replica_moves);
@@ -359,14 +473,6 @@ class Rebalancer {
// Configuration for the rebalancer.
const Config config_;
- // Random device and generator for selecting among multiple choices, when
- // appropriate.
- std::random_device random_device_;
- std::mt19937 random_generator_;
-
- // An instance of the balancing algorithm.
- TwoDimensionalGreedyAlgo algo_;
-
// Auxiliary Ksck object to get information on the cluster.
std::shared_ptr<Ksck> ksck_;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/rebalancer_tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 10fd9dd..60e9f45 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -24,6 +24,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <unordered_map>
#include <utility>
#include <vector>
@@ -36,12 +37,16 @@
#include "kudu/client/shared_ptr.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/master/master.pb.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/tablet.pb.h"
@@ -70,6 +75,7 @@ using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
+using kudu::cluster::LocationInfo;
using kudu::itest::TabletServerMap;
using kudu::tserver::ListTabletsResponsePB;
using std::atomic;
@@ -81,6 +87,7 @@ using std::string;
using std::thread;
using std::tuple;
using std::unique_ptr;
+using std::unordered_map;
using std::vector;
using strings::Substitute;
@@ -211,7 +218,8 @@ static Status CreateUnbalancedTables(
int rep_factor,
int tserver_idx_from,
int tserver_num,
- int tserver_unresponsive_ms) {
+ int tserver_unresponsive_ms,
+ vector<string>* table_names = nullptr) {
// Keep running only some tablet servers and shut down the rest.
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
cluster->tablet_server(i)->Shutdown();
@@ -225,7 +233,7 @@ static Status CreateUnbalancedTables(
// which are up and running.
auto client_schema = KuduSchema::FromSchema(table_schema);
for (auto i = 0; i < num_tables; ++i) {
- const string table_name = Substitute(table_name_pattern, i);
+ string table_name = Substitute(table_name_pattern, i);
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
RETURN_NOT_OK(table_creator->table_name(table_name)
.schema(&client_schema)
@@ -240,6 +248,9 @@ static Status CreateUnbalancedTables(
Substitute("--table_num_replicas=$0", rep_factor),
"--string_fixed=unbalanced_tables_test",
}));
+ if (table_names) {
+ table_names->emplace_back(std::move(table_name));
+ }
}
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
@@ -394,7 +405,9 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
static const char* const kTableNamePattern;
void Prepare(const vector<string>& extra_tserver_flags = {},
- const vector<string>& extra_master_flags = {}) {
+ const vector<string>& extra_master_flags = {},
+ const LocationInfo& location_info = {},
+ vector<string>* created_tables_names = nullptr) {
const auto& scheme_flag = Substitute(
"--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
master_flags_.push_back(scheme_flag);
@@ -407,12 +420,12 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase {
FLAGS_num_tablet_servers = num_tservers_;
FLAGS_num_replicas = rep_factor_;
- NO_FATALS(BuildAndStart(tserver_flags_, master_flags_));
+ NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info));
ASSERT_OK(CreateUnbalancedTables(
cluster_.get(), client_.get(), schema_, kTableNamePattern,
num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
- tserver_unresponsive_ms_));
+ tserver_unresponsive_ms_, created_tables_names));
}
// When the rebalancer starts moving replicas, ksck detects corruption
@@ -1143,5 +1156,110 @@ TEST_P(RebalancerAndSingleReplicaTablets, SingleReplicasStayOrMove) {
NO_FATALS(v.CheckCluster());
}
+// Basic fixture for the rebalancer tests.
+class LocationAwareRebalancingBasicTest : public RebalancingTest {
+ public:
+ LocationAwareRebalancingBasicTest()
+ : RebalancingTest(/*num_tables=*/ 12,
+ /*rep_factor=*/ 3,
+ /*num_tservers=*/ 6) {
+ }
+
+ bool is_343_scheme() const override {
+ // These tests are for the 3-4-3 replica management scheme only.
+ return true;
+ }
+};
+
+// Verifying the very basic functionality of the location-aware rebalancer:
+// given the very simple cluster configuration of 6 tablet servers spread
+// among 3 locations (2+2+2) and 12 tables with RF=3, the initially
+// imbalanced distribution of the replicas should become more balanced
+// and the placement policy constraints should be reimposed after running
+// the rebalancer tool.
+TEST_F(LocationAwareRebalancingBasicTest, Basic) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, };
+ vector<string> table_names;
+ NO_FATALS(Prepare({}, {}, location_info, &table_names));
+
+ const vector<string> tool_args = {
+ "cluster",
+ "rebalance",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ };
+
+ string out;
+ string err;
+ const auto s = RunKuduTool(tool_args, &out, &err);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+ ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr);
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+
+ unordered_map<string, itest::TServerDetails*> ts_map;
+ ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(0),
+ cluster_->messenger(),
+ &ts_map));
+ ValueDeleter deleter(&ts_map);
+
+ // Build tablet server UUID --> location map.
+ unordered_map<string, string> location_by_ts_id;
+ for (const auto& elem : ts_map) {
+ EmplaceOrDie(&location_by_ts_id, elem.first, elem.second->location);
+ }
+
+
+ for (const auto& table_name : table_names) {
+ master::GetTableLocationsResponsePB table_locations;
+ ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(),
+ table_name, MonoDelta::FromSeconds(30),
+ master::ANY_REPLICA,
+ &table_locations));
+ const auto tablet_num = table_locations.tablet_locations_size();
+ auto total_table_replica_count = 0;
+ unordered_map<string, int> total_count_per_location;
+ for (auto i = 0; i < tablet_num; ++i) {
+ const auto& location = table_locations.tablet_locations(i);
+ const auto& tablet_id = location.tablet_id();
+ unordered_map<string, int> count_per_location;
+ for (const auto& replica : location.replicas()) {
+ const auto& ts_id = replica.ts_info().permanent_uuid();
+ const auto& location = FindOrDie(location_by_ts_id, ts_id);
+ ++LookupOrEmplace(&count_per_location, location, 0);
+ ++LookupOrEmplace(&total_count_per_location, location, 0);
+ ++total_table_replica_count;
+ }
+
+ // Make sure no location has the majority of replicas for the tablet.
+ for (const auto& elem : count_per_location) {
+ const auto& location = elem.first;
+ const auto replica_count = elem.second;
+ ASSERT_GT(consensus::MajoritySize(rep_factor_), replica_count)
+ << Substitute("tablet $0 (table $1): $2 replicas out of $3 total "
+ "are in location $4",
+ tablet_id, table_name, replica_count, rep_factor_,
+ location);
+
+ }
+
+ // Verify the overall replica distribution for the table.
+ double avg = static_cast<double>(total_table_replica_count) / location_info.size();
+ for (const auto& elem : total_count_per_location) {
+ const auto& location = elem.first;
+ const auto replica_num = elem.second;
+ ASSERT_GT(avg + 2, replica_num) << "at location " << location;
+ ASSERT_LT(avg - 2, replica_num) << "at location " << location;
+ }
+ }
+ }
+}
+
} // namespace tools
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/81bba247/src/kudu/tools/tool_action_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index b6a6981..d4989f3 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -101,6 +101,25 @@ DEFINE_bool(report_only, false,
"Whether to report on table- and cluster-wide replica distribution "
"skew and exit without doing any actual rebalancing");
+DEFINE_bool(disable_policy_fixer, false,
+ "In case of multi-location cluster, whether to detect and fix "
+ "placement policy violations. Fixing placement policy violations "
+ "involves moving tablet replicas across different locations "
+ "of the cluster. "
+ "This setting is applicable to multi-location clusters only.");
+
+DEFINE_bool(disable_cross_location_rebalancing, false,
+ "In case of multi-location cluster, whether to move tablet "
+ "replicas between locations in attempt to spread tablet replicas "
+ "among location evenly (equalizing loads of locations throughout "
+ "the cluster). "
+ "This setting is applicable to multi-location clusters only.");
+
+DEFINE_bool(disable_intra_location_rebalancing, false,
+ "In case of multi-location cluster, whether to rebalance tablet "
+ "replica distribution within each location. "
+ "This setting is applicable to multi-location clusters only.");
+
static bool ValidateMoveSingleReplicas(const char* flag_name,
const string& flag_value) {
const vector<string> allowed_values = { "auto", "enabled", "disabled" };
@@ -260,7 +279,10 @@ Status RunRebalance(const RunnerContext& context) {
FLAGS_max_staleness_interval_sec,
FLAGS_max_run_time_sec,
move_single_replicas,
- FLAGS_output_replica_distribution_details));
+ FLAGS_output_replica_distribution_details,
+ !FLAGS_disable_policy_fixer,
+ !FLAGS_disable_cross_location_rebalancing,
+ !FLAGS_disable_intra_location_rebalancing));
// Print info on pre-rebalance distribution of replicas.
RETURN_NOT_OK(rebalancer.PrintStats(cout));
@@ -346,6 +368,9 @@ unique_ptr<Mode> BuildClusterMode() {
.Description(desc)
.ExtraDescription(extra_desc)
.AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+ .AddOptionalParameter("disable_policy_fixer")
+ .AddOptionalParameter("disable_cross_location_rebalancing")
+ .AddOptionalParameter("disable_intra_location_rebalancing")
.AddOptionalParameter("max_moves_per_server")
.AddOptionalParameter("max_run_time_sec")
.AddOptionalParameter("max_staleness_interval_sec")
[2/2] kudu git commit: [rebalancer] location-aware rebalancer (part
8/n)
Posted by al...@apache.org.
[rebalancer] location-aware rebalancer (part 8/n)
Updated FindBestReplicaToReplace() to handle common and edge cases
of even replication factors. Added corresponding unit tests as well.
Change-Id: I8f8831d254b2ca0d9a12e0ffbc336a59c3c5c8de
Reviewed-on: http://gerrit.cloudera.org:8080/11761
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4ec2598a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4ec2598a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4ec2598a
Branch: refs/heads/master
Commit: 4ec2598a355bbbd1c00c4bdc03d37bafe04d0d5f
Parents: 81bba24
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Oct 23 17:50:10 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Nov 1 03:32:50 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/placement_policy_util-test.cc | 194 ++++++++++++++++++++++
src/kudu/tools/placement_policy_util.cc | 31 +++-
2 files changed, 222 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/4ec2598a/src/kudu/tools/placement_policy_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/tools/placement_policy_util-test.cc
index 68266f3..7d48cbe 100644
--- a/src/kudu/tools/placement_policy_util-test.cc
+++ b/src/kudu/tools/placement_policy_util-test.cc
@@ -470,6 +470,24 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
{ { "t0", "L0" }, },
{},
},
+ {
+ // One RF=6 tablet with replica placement violating the placement policy.
+ {
+ { "T0", 6, { "t0", } },
+ },
+ {
+ { "L0", { "A", "B", "C", } },
+ { "L1", { "D", "E", } },
+ { "L2", { "F", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } },
+ { "D", { "t0", } }, { "E", { "t0", } },
+ { "F", { "t0", } },
+ },
+ { { "t0", "L0" }, },
+ {}
+ },
};
for (auto idx = 0; idx < configs.size(); ++idx) {
SCOPED_TRACE(Substitute("test config index: $0", idx));
@@ -490,5 +508,181 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) {
}
}
+TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) {
+ const vector<TestClusterConfig> configs = {
+ {
+ // One location, RF=2 and RF=4.
+ {
+ { "T0", 2, { "t0", } },
+ { "T1", 4, { "t1", } },
+ },
+ {
+ { "L0", { "A", "B", "C", "D", "E", } },
+ },
+ {
+ { "A", { "t0", "t1", } },
+ { "B", { "t0", "t1", } },
+ { "C", { "t1", } },
+ { "D", { "t1", } },
+ },
+ { { "t0", "L0" }, { "t1", "L0" }, },
+ {}
+ },
+ {
+ // Two locations, RF=2.
+ {
+ { "T0", 2, { "t0", "t1", } },
+ },
+ {
+ { "L0", { "A", "B", } },
+ { "L1", { "D", "E", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } },
+ { "D", { "t1", } }, { "E", { "t1", } },
+ },
+ { { "t0", "L0" }, { "t1", "L1" }, },
+ {}
+ },
+ {
+ // Two locations, RF=2 and RF=4.
+ {
+ { "T0", 2, { "t0", } },
+ { "T1", 4, { "t1", } },
+ },
+ {
+ { "L0", { "A", "B", "C", } },
+ { "L1", { "D", "E", "F", } },
+ },
+ {
+ { "A", { "t0", "t1", } }, { "B", { "t0", "t1", } },
+ { "D", { "t1", } }, { "E", { "t1", } },
+ },
+ { { "t0", "L0" }, { "t1", "L1" }, },
+ {}
+ },
+ {
+ // Two locations, two tablets, RF=2.
+ {
+ { "T0", 2, { "t0", "t1", } },
+ },
+ {
+ { "L0", { "A", "B", } },
+ { "L1", { "C", "D", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } },
+ { "C", { "t1", } }, { "D", { "t1", } },
+ },
+ { { "t0", "L0" }, { "t1", "L1" }, },
+ {}
+ },
+ {
+ // Three locations, RF=4.
+ {
+ { "T0", 4, { "t0", } },
+ },
+ {
+ { "L0", { "A", "B", } },
+ { "L1", { "D", "E", } },
+ { "L2", { "F", "G", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } },
+ { "D", { "t0", } },
+ { "F", { "t0", } },
+ },
+ { { "t0", "L0" }, },
+ {}
+ },
+ };
+ NO_FATALS(RunTest(configs));
+}
+
+TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) {
+ const vector<TestClusterConfig> configs = {
+ {
+ // Three locations, RF=6.
+ {
+ { "T0", 6, { "t0", } },
+ },
+ {
+ { "L0", { "A", "B", "C", } },
+ { "L1", { "D", "E", "F", } },
+ { "L2", { "G", "H", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } },
+ { "D", { "t0", } }, { "F", { "t0", } },
+ { "H", { "t0", } },
+ },
+ { { "t0", "L0" }, },
+ { { "t0", "B" }, }
+ },
+ {
+ // Three locations, RF=8.
+ {
+ { "T0", 8, { "t0", } },
+ },
+ {
+ { "L0", { "A", "B", "C", } },
+ { "L1", { "D", "E", "F", "G", } },
+ { "L2", { "H", "J", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } },
+ { "D", { "t0", } }, { "E", { "t0", } }, { "F", { "t0", } },
+ { "G", { "t0", } },
+ { "H", { "t0", } },
+ },
+ { { "t0", "L1" }, },
+ { { "t0", "D" }, }
+ },
+ };
+ NO_FATALS(RunTest(configs));
+}
+
+TEST_F(ClusterLocationTest, PlacementPolicyViolationsNoneEvenRF) {
+ const vector<TestClusterConfig> configs = {
+ {
+ // Three locations, RF=6.
+ {
+ { "T0", 6, { "t0", } },
+ },
+ {
+ { "L0", { "A", "B", "C", } },
+ { "L1", { "D", "E", "F", } },
+ { "L2", { "G", "H", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } },
+ { "D", { "t0", } }, { "E", { "t0", } },
+ { "G", { "t0", } }, { "H", { "t0", } },
+ },
+ {},
+ {}
+ },
+ {
+ // Three locations, RF=8.
+ {
+ { "T0", 8, { "t0", } },
+ },
+ {
+ { "L0", { "A", "B", "C", } },
+ { "L1", { "D", "E", "F", } },
+ { "L2", { "G", "H", } },
+ },
+ {
+ { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t0", } },
+ { "D", { "t0", } }, { "E", { "t0", } }, { "F", { "t0", } },
+ { "G", { "t0", } }, { "H", { "t0", } },
+ },
+ {},
+ {}
+ },
+ };
+ NO_FATALS(RunTest(configs));
+}
+
} // namespace tools
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/4ec2598a/src/kudu/tools/placement_policy_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc
index d6494eb..be1b502 100644
--- a/src/kudu/tools/placement_policy_util.cc
+++ b/src/kudu/tools/placement_policy_util.cc
@@ -68,17 +68,42 @@ Status FindBestReplicaToReplace(
}
const auto& tablet_id = info.tablet_id;
+ const auto location_num = ts_id_by_location.size();
// If a total number of locations is 2, it's impossible to make its replica
// distribution conform with the placement policy constraints.
const auto& table_id = FindOrDie(tablets_info.tablet_to_table_id, tablet_id);
const auto& table_info = FindOrDie(tablets_info.tables_info, table_id);
- if (ts_id_by_location.size() == 2 && table_info.replication_factor % 2 == 1) {
+
+ // There are a few edge cases which are most likely to occur, so let's have
+ // a special error message for those. In these cases there are too few
+ // locations relative to the replication factor, so it's impossible to find
+ // any replica movements to satisfy the placement policy constraints.
+ //
+ // One interesting case placing replicas of a tablet with RF=4 in a cluster
+ // with 3 locations. In that case, it's impossible to place the replicas to
+ // satisfy the placement policy's constraints, since any possible replicas
+ // placement does not allow to have the majority of the replicas online
+ // if any single location becomes unavailable. Below is the all the possible
+ // replica distributions for that case (modulo permutations of locations):
+ // if the first location becomes unavailable, the majority of the replicas
+ // is lost and the tablet becomes unavailable.
+ //
+ // 4 + 0 + 0
+ // 3 + 1 + 0
+ // 2 + 1 + 1
+ //
+ // Note that with 3 locations and higher replication factors (5, 6, etc.),
+ // there is always a way to place tablet replicas to conform with the
+ // restriction mentioned above.
+ if (location_num == 2 ||
+ (location_num == 3 && table_info.replication_factor == 4)) {
return Status::ConfigurationError(Substitute(
"tablet $0 (table name '$1'): replica distribution cannot conform "
"with the placement policy constraints since its replication "
- "factor is odd ($2) and there are two locations in the cluster",
- tablet_id, table_info.name, table_info.replication_factor));
+ "factor is $2 and there are $3 locations in the cluster",
+ tablet_id, table_info.name,
+ table_info.replication_factor, location_num));
}
const auto& location = info.majority_location;