You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/10/29 23:09:35 UTC
[4/4] kudu git commit: [rebalancer] location-aware rebalancer (part
4/n)
[rebalancer] location-aware rebalancer (part 4/n)
Refactored Rebalancer and Runner classes, separating common base for
a runner of the rebalancing process.
Change-Id: Id47183fc853573390b22ec714751adec93e0ea3a
Reviewed-on: http://gerrit.cloudera.org:8080/11745
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/34bb7f93
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/34bb7f93
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/34bb7f93
Branch: refs/heads/master
Commit: 34bb7f93b4bcb8d39803d0f0718148f84f5cca22
Parents: 43161e5
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Oct 19 22:20:33 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Mon Oct 29 23:05:50 2018 +0000
----------------------------------------------------------------------
src/kudu/tools/rebalancer.cc | 640 ++++++++++++++++++++------------------
src/kudu/tools/rebalancer.h | 209 ++++++++-----
2 files changed, 481 insertions(+), 368 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/34bb7f93/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index c3225f2..abf28f0 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -39,6 +39,7 @@
#include "kudu/client/client.h"
#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tools/ksck.h"
@@ -93,9 +94,7 @@ Rebalancer::Config::Config(
}
Rebalancer::Rebalancer(const Config& config)
- : config_(config),
- random_device_(),
- random_generator_(random_device_()) {
+ : config_(config) {
}
Status Rebalancer::PrintStats(std::ostream& out) {
@@ -109,13 +108,12 @@ Status Rebalancer::PrintStats(std::ostream& out) {
ClusterInfo ci;
RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
- auto& cbi = ci.balance;
// Per-server replica distribution stats.
{
out << "Per-server replica distribution summary:" << endl;
DataTable summary({"Statistic", "Value"});
- const auto& servers_load_info = cbi.servers_by_total_replica_count;
+ const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
if (servers_load_info.empty()) {
summary.AddRow({ "N/A", "N/A" });
} else {
@@ -159,7 +157,7 @@ Status Rebalancer::PrintStats(std::ostream& out) {
{
out << "Per-table replica distribution summary:" << endl;
DataTable summary({ "Replica Skew", "Value" });
- const auto& table_skew_info = cbi.table_info_by_skew;
+ const auto& table_skew_info = ci.balance.table_info_by_skew;
if (table_skew_info.empty()) {
summary.AddRow({ "N/A", "N/A" });
} else {
@@ -225,106 +223,145 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
ClusterInfo ci;
RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
- Runner runner(config_.max_moves_per_server, deadline);
+ TwoDimensionalGreedyRunner runner(this, config_.max_moves_per_server, deadline);
RETURN_NOT_OK(runner.Init(config_.master_addresses));
+ RETURN_NOT_OK(RunWith(&runner, result_status));
+ if (moves_count != nullptr) {
+ *moves_count = runner.moves_count();
+ }
- const MonoDelta max_staleness_delta =
- MonoDelta::FromSeconds(config_.max_staleness_interval_sec);
- MonoTime staleness_start = MonoTime::Now();
- bool is_timed_out = false;
- bool resync_state = false;
- while (!is_timed_out) {
- if (resync_state) {
- resync_state = false;
- MonoDelta staleness_delta = MonoTime::Now() - staleness_start;
- if (staleness_delta > max_staleness_delta) {
- LOG(INFO) << Substitute("detected a staleness period of $0", staleness_delta.ToString());
- return Status::Incomplete(Substitute(
- "stalled with no progress for more than $0 seconds, aborting",
- max_staleness_delta.ToString()));
- }
- // The actual re-synchronization happens during GetNextMoves() below:
- // updated info is collected from the cluster and fed into the algorithm.
- LOG(INFO) << "re-synchronizing cluster state";
- }
+ return Status::OK();
+}
- {
- vector<Rebalancer::ReplicaMove> replica_moves;
- RETURN_NOT_OK(GetNextMoves(runner.scheduled_moves(), &replica_moves));
- if (replica_moves.empty() && runner.scheduled_moves().empty()) {
- // No moves are left: done!
- break;
- }
+Status Rebalancer::KsckResultsToClusterRawInfo(
+ const KsckResults& ksck_info,
+ ClusterRawInfo* raw_info) {
+ DCHECK(raw_info);
- // Filter out moves for tablets which already have operations in progress.
- FilterMoves(runner.scheduled_moves(), &replica_moves);
- runner.LoadMoves(std::move(replica_moves));
- }
+ raw_info->tserver_summaries = ksck_info.tserver_summaries;
+ raw_info->table_summaries = ksck_info.table_summaries;
+ raw_info->tablet_summaries = ksck_info.tablet_summaries;
- auto has_errors = false;
- while (!is_timed_out) {
- auto is_scheduled = runner.ScheduleNextMove(&has_errors, &is_timed_out);
- resync_state |= has_errors;
- if (resync_state || is_timed_out) {
- break;
- }
- if (is_scheduled) {
- // Reset the start of the staleness interval: there was some progress
- // in scheduling new move operations.
- staleness_start = MonoTime::Now();
+ return Status::OK();
+}
- // Continue scheduling available move operations while there is enough
- // capacity, i.e. until number of pending move operations on every
- // involved tablet server reaches max_moves_per_server. Once no more
- // operations can be scheduled, it's time to check for their status.
+// Given high-level description of moves, find tablets with replicas at the
+// corresponding tablet servers to satisfy those high-level descriptions.
+// The idea is to find all tablets of the specified table that would have a
+// replica at the source server, but would not have a replica at the destination
+// server. That is to satisfy the restriction of having no more than one replica
+// of the same tablet per server.
+//
+// An additional constraint: it's better not to move leader replicas, if
+// possible. If a client has a write operation in progress, moving leader
+// replicas of affected tablets would make the client to re-resolve new leaders
+// and retry the operations. Moving leader replicas is used as last resort
+// when no other candidates are left.
+Status Rebalancer::FindReplicas(const TableReplicaMove& move,
+ const ClusterRawInfo& raw_info,
+ vector<string>* tablet_ids) {
+ const auto& table_id = move.table_id;
+
+ // Tablet ids of replicas on the source tserver that are non-leaders.
+ vector<string> tablet_uuids_src;
+ // Tablet ids of replicas on the source tserver that are leaders.
+ vector<string> tablet_uuids_src_leaders;
+ // UUIDs of tablets of the selected table at the destination tserver.
+ vector<string> tablet_uuids_dst;
+
+ for (const auto& tablet_summary : raw_info.tablet_summaries) {
+ if (tablet_summary.table_id != table_id) {
+ continue;
+ }
+ if (tablet_summary.result != KsckCheckResult::HEALTHY) {
+ VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
+ "as candidates for movement since the tablet's "
+ "status is '$2'",
+ table_id, tablet_summary.id,
+ KsckCheckResultToString(tablet_summary.result));
+ continue;
+ }
+ for (const auto& replica_summary : tablet_summary.replicas) {
+ if (replica_summary.ts_uuid != move.from &&
+ replica_summary.ts_uuid != move.to) {
continue;
}
-
- // Poll for the status of pending operations. If some of the in-flight
- // operations are complete, it might be possible to schedule new ones
- // by calling Runner::ScheduleNextMove().
- auto has_updates = runner.UpdateMovesInProgressStatus(&has_errors,
- &is_timed_out);
- if (has_updates) {
- // Reset the start of the staleness interval: there was some updates
- // on the status of scheduled move operations.
- staleness_start = MonoTime::Now();
+ if (!replica_summary.ts_healthy) {
+ VLOG(1) << Substitute("table $0: not considering replica movement "
+ "from $1 to $2 since server $3 is not healthy",
+ table_id,
+ move.from, move.to, replica_summary.ts_uuid);
+ continue;
}
- resync_state |= has_errors;
- if (resync_state || is_timed_out || !has_updates) {
- // If there were errors while trying to get the statuses of pending
- // operations it's necessary to re-synchronize the state of the cluster:
- // most likely something has changed, so it's better to get a new set
- // of planned moves.
- break;
+ if (replica_summary.ts_uuid == move.from) {
+ if (replica_summary.is_leader) {
+ tablet_uuids_src_leaders.emplace_back(tablet_summary.id);
+ } else {
+ tablet_uuids_src.emplace_back(tablet_summary.id);
+ }
+ } else {
+ DCHECK_EQ(move.to, replica_summary.ts_uuid);
+ tablet_uuids_dst.emplace_back(tablet_summary.id);
}
-
- // Sleep a bit before going next cycle of status polling.
- SleepFor(MonoDelta::FromMilliseconds(200));
}
}
+ sort(tablet_uuids_src.begin(), tablet_uuids_src.end());
+ sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end());
- *result_status = is_timed_out ? RunStatus::TIMED_OUT
- : RunStatus::CLUSTER_IS_BALANCED;
- if (moves_count) {
- *moves_count = runner.moves_count();
- }
+ vector<string> tablet_uuids;
+ set_difference(
+ tablet_uuids_src.begin(), tablet_uuids_src.end(),
+ tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+ inserter(tablet_uuids, tablet_uuids.begin()));
- return Status::OK();
-}
+ if (!tablet_uuids.empty()) {
+ // If there are tablets with non-leader replicas at the source server,
+ // those are the best candidates for movement.
+ tablet_ids->swap(tablet_uuids);
+ return Status::OK();
+ }
-Status Rebalancer::KsckResultsToClusterRawInfo(
- const KsckResults& ksck_info,
- ClusterRawInfo* raw_info) {
- DCHECK(raw_info);
+ // If no tablets with non-leader replicas were found, resort to tablets with
+ // leader replicas at the source server.
+ DCHECK(tablet_uuids.empty());
+ sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end());
+ set_difference(
+ tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(),
+ tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+ inserter(tablet_uuids, tablet_uuids.begin()));
- raw_info->tserver_summaries = ksck_info.tserver_summaries;
- raw_info->table_summaries = ksck_info.table_summaries;
- raw_info->tablet_summaries = ksck_info.tablet_summaries;
+ tablet_ids->swap(tablet_uuids);
return Status::OK();
}
+void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
+ vector<ReplicaMove>* replica_moves) {
+ unordered_set<string> tablet_uuids;
+ vector<ReplicaMove> filtered_replica_moves;
+ for (auto& move_op : *replica_moves) {
+ const auto& tablet_uuid = move_op.tablet_uuid;
+ if (ContainsKey(scheduled_moves, tablet_uuid)) {
+ // There is a move operation in progress for the tablet, don't schedule
+ // another one.
+ continue;
+ }
+ if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) {
+ filtered_replica_moves.emplace_back(std::move(move_op));
+ } else {
+ // Rationale behind the unique tablet constraint: the implementation of
+ // the Run() method is designed to re-order operations suggested by the
+ // high-level algorithm to use the op-count-per-tablet-server capacity
+ // as much as possible. Right now, the RunStep() method outputs only one
+ // move operation per tablet in every batch. The code below is to
+ // enforce the contract between Run() and RunStep() methods.
+ LOG(DFATAL) << "detected multiple replica move operations for the same "
+ "tablet " << tablet_uuid;
+ }
+ }
+ *replica_moves = std::move(filtered_replica_moves);
+}
+
Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
const MovesInProgress& moves_in_progress,
ClusterInfo* info) const {
@@ -463,175 +500,88 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
return Status::OK();
}
-
-// Run one step of the rebalancer. Due to the inherent restrictions of the
-// rebalancing engine, no more than one replica per tablet is moved during
-// one step of the rebalancing.
-Status Rebalancer::GetNextMoves(const MovesInProgress& moves_in_progress,
- vector<ReplicaMove>* replica_moves) {
- RETURN_NOT_OK(RefreshKsckResults());
- const auto& ksck_info = ksck_->results();
-
- // For simplicity, allow to run the rebalancing only when all tablet servers
- // are in good shape. Otherwise, the rebalancing might interfere with the
- // automatic re-replication or get unexpected errors while moving replicas.
- for (const auto& s : ksck_info.tserver_summaries) {
- if (s.health != KsckServerHealth::HEALTHY) {
- return Status::IllegalState(
- Substitute("tablet server $0 ($1): unacceptable health status $2",
- s.uuid, s.address, ServerHealthToString(s.health)));
+Status Rebalancer::RunWith(Runner* runner, RunStatus* result_status) {
+ const MonoDelta max_staleness_delta =
+ MonoDelta::FromSeconds(config_.max_staleness_interval_sec);
+ MonoTime staleness_start = MonoTime::Now();
+ bool is_timed_out = false;
+ bool resync_state = false;
+ while (!is_timed_out) {
+ if (resync_state) {
+ resync_state = false;
+ MonoDelta staleness_delta = MonoTime::Now() - staleness_start;
+ if (staleness_delta > max_staleness_delta) {
+ LOG(INFO) << Substitute("detected a staleness period of $0",
+ staleness_delta.ToString());
+ return Status::Incomplete(Substitute(
+ "stalled with no progress for more than $0 seconds, aborting",
+ max_staleness_delta.ToString()));
+ }
+ // The actual re-synchronization happens during GetNextMoves() below:
+ // updated info is collected from the cluster and fed into the algorithm.
+ LOG(INFO) << "re-synchronizing cluster state";
}
- }
-
- // The number of operations to output by the algorithm. Those will be
- // translated into concrete tablet replica movement operations, the output of
- // this method.
- const size_t max_moves = config_.max_moves_per_server *
- ksck_info.tserver_summaries.size() * 5;
- ClusterRawInfo raw_info;
- RETURN_NOT_OK(KsckResultsToClusterRawInfo(ksck_info, &raw_info));
+ bool has_more_moves = false;
+ RETURN_NOT_OK(runner->GetNextMoves(&has_more_moves));
+ if (!has_more_moves) {
+ // No moves are left, done!
+ break;
+ }
- replica_moves->clear();
- vector<TableReplicaMove> moves;
- ClusterInfo cluster_info;
- RETURN_NOT_OK(BuildClusterInfo(raw_info, moves_in_progress, &cluster_info));
- RETURN_NOT_OK(algo_.GetNextMoves(cluster_info, max_moves, &moves));
- if (moves.empty()) {
- // No suitable moves were found: the cluster is balanced,
- // assuming all pending moves, if any, will succeed.
- return Status::OK();
- }
- unordered_set<string> tablets_in_move;
- std::transform(moves_in_progress.begin(), moves_in_progress.end(),
- inserter(tablets_in_move, tablets_in_move.begin()),
- [](const MovesInProgress::value_type& elem) {
- return elem.first;
- });
- for (const auto& move : moves) {
- vector<string> tablet_ids;
- RETURN_NOT_OK(FindReplicas(move, ksck_info, &tablet_ids));
- // Shuffle the set of the tablet identifiers: that's to achieve even spread
- // of moves across tables with the same skew.
- std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
- string move_tablet_id;
- for (const auto& tablet_id : tablet_ids) {
- if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) {
- // For now, choose the very first tablet that does not have replicas
- // in move. Later on, additional logic might be added to find
- // the best candidate.
- move_tablet_id = tablet_id;
+ auto has_errors = false;
+ while (!is_timed_out) {
+ auto is_scheduled = runner->ScheduleNextMove(&has_errors, &is_timed_out);
+ resync_state |= has_errors;
+ if (resync_state || is_timed_out) {
break;
}
- }
- if (move_tablet_id.empty()) {
- LOG(WARNING) << Substitute(
- "table $0: could not find any suitable replica to move "
- "from server $1 to server $2", move.table_id, move.from, move.to);
- continue;
- }
- ReplicaMove info;
- info.tablet_uuid = move_tablet_id;
- info.ts_uuid_from = move.from;
- info.ts_uuid_to = move.to;
- replica_moves->emplace_back(std::move(info));
- // Mark the tablet as 'has a replica in move'.
- tablets_in_move.emplace(move_tablet_id);
- }
-
- return Status::OK();
-}
-
-// Given high-level description of moves, find tablets with replicas at the
-// corresponding tablet servers to satisfy those high-level descriptions.
-// The idea is to find all tablets of the specified table that would have a
-// replica at the source server, but would not have a replica at the destination
-// server. That is to satisfy the restriction of having no more than one replica
-// of the same tablet per server.
-//
-// An additional constraint: it's better not to move leader replicas, if
-// possible. If a client has a write operation in progress, moving leader
-// replicas of affected tablets would make the client to re-resolve new leaders
-// and retry the operations. Moving leader replicas is used as last resort
-// when no other candidates are left.
-Status Rebalancer::FindReplicas(const TableReplicaMove& move,
- const KsckResults& ksck_info,
- vector<string>* tablet_ids) const {
- const auto& table_id = move.table_id;
-
- // Tablet ids of replicas on the source tserver that are non-leaders.
- vector<string> tablet_uuids_src;
- // Tablet ids of replicas on the source tserver that are leaders.
- vector<string> tablet_uuids_src_leaders;
- // UUIDs of tablets of the selected table at the destination tserver.
- vector<string> tablet_uuids_dst;
+ if (is_scheduled) {
+ // Reset the start of the staleness interval: there was some progress
+ // in scheduling new move operations.
+ staleness_start = MonoTime::Now();
- for (const auto& tablet_summary : ksck_info.tablet_summaries) {
- if (tablet_summary.table_id != table_id) {
- continue;
- }
- if (tablet_summary.result != KsckCheckResult::HEALTHY) {
- VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
- "as candidates for movement since the tablet's "
- "status is '$2'",
- table_id, tablet_summary.id,
- KsckCheckResultToString(tablet_summary.result));
- continue;
- }
- for (const auto& replica_summary : tablet_summary.replicas) {
- if (replica_summary.ts_uuid != move.from &&
- replica_summary.ts_uuid != move.to) {
+ // Continue scheduling available move operations while there is enough
+ // capacity, i.e. until number of pending move operations on every
+ // involved tablet server reaches max_moves_per_server. Once no more
+ // operations can be scheduled, it's time to check for their status.
continue;
}
- if (!replica_summary.ts_healthy) {
- VLOG(1) << Substitute("table $0: not considering replica movement "
- "from $1 to $2 since server $3 is not healthy",
- table_id,
- move.from, move.to, replica_summary.ts_uuid);
- continue;
+
+ // Poll for the status of pending operations. If some of the in-flight
+ // operations are complete, it might be possible to schedule new ones
+ // by calling Runner::ScheduleNextMove().
+ auto has_updates = runner->UpdateMovesInProgressStatus(&has_errors,
+ &is_timed_out);
+ if (has_updates) {
+ // Reset the start of the staleness interval: there was some updates
+ // on the status of scheduled move operations.
+ staleness_start = MonoTime::Now();
}
- if (replica_summary.ts_uuid == move.from) {
- if (replica_summary.is_leader) {
- tablet_uuids_src_leaders.emplace_back(tablet_summary.id);
- } else {
- tablet_uuids_src.emplace_back(tablet_summary.id);
- }
- } else {
- DCHECK_EQ(move.to, replica_summary.ts_uuid);
- tablet_uuids_dst.emplace_back(tablet_summary.id);
+ resync_state |= has_errors;
+ if (resync_state || is_timed_out || !has_updates) {
+ // If there were errors while trying to get the statuses of pending
+ // operations it's necessary to re-synchronize the state of the cluster:
+ // most likely something has changed, so it's better to get a new set
+ // of planned moves.
+ break;
}
- }
- }
- sort(tablet_uuids_src.begin(), tablet_uuids_src.end());
- sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end());
- vector<string> tablet_uuids;
- set_difference(
- tablet_uuids_src.begin(), tablet_uuids_src.end(),
- tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
- inserter(tablet_uuids, tablet_uuids.begin()));
-
- if (!tablet_uuids.empty()) {
- // If there are tablets with non-leader replicas at the source server,
- // those are the best candidates for movement.
- tablet_ids->swap(tablet_uuids);
- return Status::OK();
+ // Sleep a bit before going next cycle of status polling.
+ SleepFor(MonoDelta::FromMilliseconds(200));
+ }
}
- // If no tablets with non-leader replicas were found, resort to tablets with
- // leader replicas at the source server.
- DCHECK(tablet_uuids.empty());
- sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end());
- set_difference(
- tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(),
- tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
- inserter(tablet_uuids, tablet_uuids.begin()));
-
- tablet_ids->swap(tablet_uuids);
-
+ *result_status = is_timed_out ? RunStatus::TIMED_OUT
+ : RunStatus::CLUSTER_IS_BALANCED;
return Status::OK();
}
+Status Rebalancer::GetClusterRawInfo(ClusterRawInfo* raw_info) {
+ RETURN_NOT_OK(RefreshKsckResults());
+ return KsckResultsToClusterRawInfo(ksck_->results(), raw_info);
+}
+
Status Rebalancer::RefreshKsckResults() {
shared_ptr<KsckCluster> cluster;
RETURN_NOT_OK_PREPEND(
@@ -643,47 +593,20 @@ Status Rebalancer::RefreshKsckResults() {
return Status::OK();
}
-void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
- vector<ReplicaMove>* replica_moves) {
- unordered_set<string> tablet_uuids;
- vector<ReplicaMove> filtered_replica_moves;
- for (auto&& move_op : *replica_moves) {
- const auto& tablet_uuid = move_op.tablet_uuid;
- if (scheduled_moves.find(tablet_uuid) != scheduled_moves.end()) {
- // There is a move operation in progress for the tablet, don't schedule
- // another one.
- continue;
- }
- if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) {
- filtered_replica_moves.push_back(std::move(move_op));
- } else {
- // Rationale behind the unique tablet constraint: the implementation of
- // the Run() method is designed to re-order operations suggested by the
- // high-level algorithm to use the op-count-per-tablet-server capacity
- // as much as possible. Right now, the RunStep() method outputs only one
- // move operation per tablet in every batch. The code below is to
- // enforce the contract between Run() and RunStep() methods.
- LOG(DFATAL) << "detected multiple replica move operations for the same "
- "tablet " << tablet_uuid;
- }
- }
- replica_moves->swap(filtered_replica_moves);
-}
-
-Rebalancer::Runner::Runner(size_t max_moves_per_server,
- const boost::optional<MonoTime>& deadline)
- : max_moves_per_server_(max_moves_per_server),
- deadline_(deadline),
+Rebalancer::BaseRunner::BaseRunner(Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline)
+ : rebalancer_(rebalancer),
+ max_moves_per_server_(max_moves_per_server),
+ deadline_(std::move(deadline)),
moves_count_(0) {
+ CHECK(rebalancer_);
}
-Status Rebalancer::Runner::Init(vector<string> master_addresses) {
+Status Rebalancer::BaseRunner::Init(vector<string> master_addresses) {
DCHECK_EQ(0, moves_count_);
- DCHECK(src_op_indices_.empty());
- DCHECK(dst_op_indices_.empty());
DCHECK(op_count_per_ts_.empty());
DCHECK(ts_per_op_count_.empty());
- DCHECK(scheduled_moves_.empty());
DCHECK(master_addresses_.empty());
DCHECK(client_.get() == nullptr);
master_addresses_ = std::move(master_addresses);
@@ -692,7 +615,60 @@ Status Rebalancer::Runner::Init(vector<string> master_addresses) {
.Build(&client_);
}
-void Rebalancer::Runner::LoadMoves(vector<ReplicaMove> replica_moves) {
+Status Rebalancer::BaseRunner::GetNextMoves(bool* has_moves) {
+ vector<ReplicaMove> replica_moves;
+ RETURN_NOT_OK(GetNextMovesImpl(&replica_moves));
+ if (replica_moves.empty() && scheduled_moves_.empty()) {
+ *has_moves = false;
+ return Status::OK();
+ }
+
+ // Filter out moves for tablets which already have operations in progress.
+ // The idea is simple: no more than one move operation per tablet should
+ // ever be attempted.
+ Rebalancer::FilterMoves(scheduled_moves_, &replica_moves);
+ LoadMoves(std::move(replica_moves));
+
+ // TODO(aserbin): now this method reports on presence of some moves even if
+ // all of those are in progress and no fresh new are available.
+ // Would it be more convenient for to report only on the
+ // fresh new moves and check for the presence of the scheduled
+ // moves at the upper level?
+ *has_moves = true;
+ return Status::OK();
+}
+
+void Rebalancer::BaseRunner::UpdateOnMoveCompleted(const string& ts_uuid) {
+ const auto op_count = op_count_per_ts_[ts_uuid]--;
+ const auto op_range = ts_per_op_count_.equal_range(op_count);
+ bool ts_per_op_count_updated = false;
+ for (auto it = op_range.first; it != op_range.second; ++it) {
+ if (it->second == ts_uuid) {
+ ts_per_op_count_.erase(it);
+ ts_per_op_count_.emplace(op_count - 1, ts_uuid);
+ ts_per_op_count_updated = true;
+ break;
+ }
+ }
+ DCHECK(ts_per_op_count_updated);
+}
+
+Rebalancer::AlgoBasedRunner::AlgoBasedRunner(
+ Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline)
+ : BaseRunner(rebalancer, max_moves_per_server, std::move(deadline)),
+ random_generator_(random_device_()) {
+}
+
+Status Rebalancer::AlgoBasedRunner::Init(vector<string> master_addresses) {
+ DCHECK(src_op_indices_.empty());
+ DCHECK(dst_op_indices_.empty());
+ DCHECK(scheduled_moves_.empty());
+ return BaseRunner::Init(std::move(master_addresses));
+}
+
+void Rebalancer::AlgoBasedRunner::LoadMoves(vector<ReplicaMove> replica_moves) {
// The moves to schedule (used by subsequent calls to ScheduleNextMove()).
replica_moves_.swap(replica_moves);
@@ -758,7 +734,8 @@ void Rebalancer::Runner::LoadMoves(vector<ReplicaMove> replica_moves) {
}
// Return true if replica move operation has been scheduled successfully.
-bool Rebalancer::Runner::ScheduleNextMove(bool* has_errors, bool* timed_out) {
+bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
+ bool* timed_out) {
DCHECK(has_errors);
DCHECK(timed_out);
*has_errors = false;
@@ -811,7 +788,7 @@ bool Rebalancer::Runner::ScheduleNextMove(bool* has_errors, bool* timed_out) {
return false;
}
-bool Rebalancer::Runner::UpdateMovesInProgressStatus(
+bool Rebalancer::AlgoBasedRunner::UpdateMovesInProgressStatus(
bool* has_errors, bool* timed_out) {
DCHECK(has_errors);
DCHECK(timed_out);
@@ -848,7 +825,9 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus(
// Erase the element and advance the iterator.
it = scheduled_moves_.erase(it);
continue;
- } else if (is_complete) {
+ }
+ DCHECK(s.ok());
+ if (is_complete) {
// The move has completed (success or failure): update the stats on the
// pending operations per server.
++moves_count_;
@@ -856,7 +835,7 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus(
UpdateOnMoveCompleted(it->second.ts_uuid_to);
LOG(INFO) << Substitute("tablet $0: $1 -> $2 move completed: $3",
tablet_id, src_ts_uuid, dst_ts_uuid,
- s.ok() ? move_status.ToString() : s.ToString());
+ move_status.ToString());
// Erase the element and advance the iterator.
it = scheduled_moves_.erase(it);
continue;
@@ -869,7 +848,83 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus(
return has_updates;
}
-bool Rebalancer::Runner::FindNextMove(size_t* op_idx) {
+// Run one step of the rebalancer. Due to the inherent restrictions of the
+// rebalancing engine, no more than one replica per tablet is moved during
+// one step of the rebalancing.
+Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
+ vector<ReplicaMove>* replica_moves) {
+ ClusterRawInfo raw_info;
+ RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(&raw_info));
+
+ // For simplicity, allow to run the rebalancing only when all tablet servers
+ // are in good shape. Otherwise, the rebalancing might interfere with the
+ // automatic re-replication or get unexpected errors while moving replicas.
+ for (const auto& s : raw_info.tserver_summaries) {
+ if (s.health != KsckServerHealth::HEALTHY) {
+ return Status::IllegalState(
+ Substitute("tablet server $0 ($1): unacceptable health status $2",
+ s.uuid, s.address, ServerHealthToString(s.health)));
+ }
+ }
+
+ // The number of operations to output by the algorithm. Those will be
+ // translated into concrete tablet replica movement operations, the output of
+ // this method.
+ const size_t max_moves = max_moves_per_server_ *
+ raw_info.tserver_summaries.size() * 5;
+
+ replica_moves->clear();
+ vector<TableReplicaMove> moves;
+ ClusterInfo cluster_info;
+ RETURN_NOT_OK(rebalancer_->BuildClusterInfo(
+ raw_info, scheduled_moves_, &cluster_info));
+ RETURN_NOT_OK(algorithm()->GetNextMoves(cluster_info, max_moves, &moves));
+ if (moves.empty()) {
+ // No suitable moves were found: the cluster described by the 'cluster_info'
+ // is balanced, assuming the pending moves, if any, will succeed.
+ return Status::OK();
+ }
+ unordered_set<string> tablets_in_move;
+ std::transform(scheduled_moves_.begin(), scheduled_moves_.end(),
+ inserter(tablets_in_move, tablets_in_move.begin()),
+ [](const MovesInProgress::value_type& elem) {
+ return elem.first;
+ });
+ for (const auto& move : moves) {
+ vector<string> tablet_ids;
+ RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));
+ // Shuffle the set of the tablet identifiers: that's to achieve even spread
+ // of moves across tables with the same skew.
+ std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
+ string move_tablet_id;
+ for (const auto& tablet_id : tablet_ids) {
+ if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) {
+ // For now, choose the very first tablet that does not have replicas
+ // in move. Later on, additional logic might be added to find
+ // the best candidate.
+ move_tablet_id = tablet_id;
+ break;
+ }
+ }
+ if (move_tablet_id.empty()) {
+ LOG(WARNING) << Substitute(
+ "table $0: could not find any suitable replica to move "
+ "from server $1 to server $2", move.table_id, move.from, move.to);
+ continue;
+ }
+ ReplicaMove info;
+ info.tablet_uuid = move_tablet_id;
+ info.ts_uuid_from = move.from;
+ info.ts_uuid_to = move.to;
+ replica_moves->emplace_back(std::move(info));
+ // Mark the tablet as 'has a replica in move'.
+ tablets_in_move.emplace(move_tablet_id);
+ }
+
+ return Status::OK();
+}
+
+bool Rebalancer::AlgoBasedRunner::FindNextMove(size_t* op_idx) {
vector<size_t> op_indices;
for (auto it = ts_per_op_count_.begin(); op_indices.empty() &&
it != ts_per_op_count_.end() && it->first < max_moves_per_server_; ++it) {
@@ -915,7 +970,7 @@ bool Rebalancer::Runner::FindNextMove(size_t* op_idx) {
return !op_indices.empty();
}
-void Rebalancer::Runner::UpdateOnMoveScheduled(
+void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduled(
size_t idx,
const string& tablet_uuid,
const string& src_ts_uuid,
@@ -925,13 +980,14 @@ void Rebalancer::Runner::UpdateOnMoveScheduled(
Rebalancer::ReplicaMove move_info = { tablet_uuid, src_ts_uuid, dst_ts_uuid };
auto ins = scheduled_moves_.emplace(tablet_uuid, std::move(move_info));
// Only one replica of a tablet can be moved at a time.
- DCHECK(ins.second);
+ // TODO(aserbin): clarify on duplicates
+ //DCHECK(ins.second);
}
UpdateOnMoveScheduledImpl(idx, src_ts_uuid, is_success, &src_op_indices_);
UpdateOnMoveScheduledImpl(idx, dst_ts_uuid, is_success, &dst_op_indices_);
}
-void Rebalancer::Runner::UpdateOnMoveScheduledImpl(
+void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
size_t idx,
const string& ts_uuid,
bool is_success,
@@ -959,19 +1015,11 @@ void Rebalancer::Runner::UpdateOnMoveScheduledImpl(
}
}
-void Rebalancer::Runner::UpdateOnMoveCompleted(const string& ts_uuid) {
- const auto op_count = op_count_per_ts_[ts_uuid]--;
- const auto op_range = ts_per_op_count_.equal_range(op_count);
- bool ts_per_op_count_updated = false;
- for (auto it = op_range.first; it != op_range.second; ++it) {
- if (it->second == ts_uuid) {
- ts_per_op_count_.erase(it);
- ts_per_op_count_.emplace(op_count - 1, ts_uuid);
- ts_per_op_count_updated = true;
- break;
- }
- }
- DCHECK(ts_per_op_count_updated);
+Rebalancer::TwoDimensionalGreedyRunner::TwoDimensionalGreedyRunner(
+ Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline)
+ : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)) {
}
} // namespace tools
http://git-wip-us.apache.org/repos/asf/kudu/blob/34bb7f93/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 2b13a92..c4f5824 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -131,26 +131,21 @@ class Rebalancer {
// and track already scheduled ones.
class Runner {
public:
- // The 'max_moves_per_server' specifies the maximum number of operations
- // per tablet server (both the source and the destination are counted in).
- // The 'deadline' specifies the deadline for the run, 'boost::none'
- // if no timeout is set.
- Runner(size_t max_moves_per_server,
- const boost::optional<MonoTime>& deadline);
+ virtual ~Runner() = default;
// Initialize instance of Runner so it can run against Kudu cluster with
// the 'master_addresses' RPC endpoints.
- Status Init(std::vector<std::string> master_addresses);
+ virtual Status Init(std::vector<std::string> master_addresses) = 0;
// Load information on prescribed replica movement operations. Also,
// populate helper containers and other auxiliary run-time structures
// used by ScheduleNextMove(). This method is called with every batch
// of move operations output by the rebalancing algorithm once previously
// loaded moves have been scheduled.
- void LoadMoves(std::vector<ReplicaMove> replica_moves);
+ virtual void LoadMoves(std::vector<ReplicaMove> replica_moves) = 0;
// Schedule next replica move.
- bool ScheduleNextMove(bool* has_errors, bool* timed_out);
+ virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0;
// Update statuses and auxiliary information on in-progress replica move
// operations. The 'timed_out' parameter is set to 'true' if not all
@@ -158,39 +153,45 @@ class Rebalancer {
// the 'deadline_' member field. The method returns 'true' if it's necessary
// to clear the state of the in-progress operations, i.e. 'forget'
// those, starting from a clean state.
- bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out);
+ virtual bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) = 0;
- uint32_t moves_count() const {
- return moves_count_;
- }
+ virtual Status GetNextMoves(bool* has_moves) = 0;
- const MovesInProgress& scheduled_moves() const {
- return scheduled_moves_;
- }
+ virtual uint32_t moves_count() const = 0;
+ }; // class Runner
- private:
- // Given the data in the helper containers, find the index describing
- // the next replica move and output it into the 'op_idx' parameter.
- bool FindNextMove(size_t* op_idx);
+ // Common base for a few Runner implementations.
+ class BaseRunner : public Runner {
+ public:
+ BaseRunner(Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline);
- // Update the helper containers once a move operation has been scheduled.
- void UpdateOnMoveScheduled(size_t idx,
- const std::string& tablet_uuid,
- const std::string& src_ts_uuid,
- const std::string& dst_ts_uuid,
- bool is_success);
+ Status Init(std::vector<std::string> master_addresses) override;
- // Auxiliary method used by UpdateOnMoveScheduled() implementation.
- void UpdateOnMoveScheduledImpl(
- size_t idx,
- const std::string& ts_uuid,
- bool is_success,
- std::unordered_map<std::string, std::set<size_t>>* op_indices);
+ Status GetNextMoves(bool* has_moves) override;
+
+ uint32_t moves_count() const override {
+ return moves_count_;
+ }
+
+ protected:
+ // Get next batch of replica moves from the rebalancing algorithm.
+ // Essentially, it runs ksck against the cluster and feeds the data into the
+ // rebalancing algorithm along with the information on currently pending
+ // replica movement operations. The information returned by the high-level
+ // rebalancing algorithm is translated into particular replica movement
+ // instructions, which are used to populate the 'replica_moves' parameter
+ // (the container is cleared first).
+ virtual Status GetNextMovesImpl(std::vector<ReplicaMove>* moves) = 0;
// Update the helper containers once a scheduled operation is complete
// (i.e. succeeded or failed).
void UpdateOnMoveCompleted(const std::string& ts_uuid);
+ // A pointer to the Rebalancer object.
+ Rebalancer* rebalancer_;
+
// Maximum allowed number of move operations per server. For a move
// operation, a source replica adds +1 at the source server and the target
// replica adds +1 at the destination server.
@@ -200,12 +201,71 @@ class Rebalancer {
// ScheduleNextMoves() and UpadteMovesInProgressStatus() methods.
const boost::optional<MonoTime> deadline_;
+ // Client object to make queries to Kudu masters for various auxiliary info
+ // while scheduling move operations and monitoring their status.
+ client::sp::shared_ptr<client::KuduClient> client_;
+
+ // Information on scheduled replica movement operations; keys are
+ // tablet UUIDs, values are ReplicaMove structures.
+ MovesInProgress scheduled_moves_;
+
// Number of successfully completed replica moves operations.
uint32_t moves_count_;
// Kudu cluster RPC end-points.
std::vector<std::string> master_addresses_;
+ // Mapping 'tserver UUID' --> 'scheduled move operations count'.
+ std::unordered_map<std::string, int32_t> op_count_per_ts_;
+
+ // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
+ // just reversed 'op_count_per_ts_'.
+ std::multimap<int32_t, std::string> ts_per_op_count_;
+ }; // class BaseRunner
+
+ // Runner that leverages RebalancingAlgo interface for rebalancing.
+ class AlgoBasedRunner : public BaseRunner {
+ public:
+ // The 'max_moves_per_server' specifies the maximum number of operations
+ // per tablet server (both the source and the destination are counted in).
+ // The 'deadline' specifies the deadline for the run, 'boost::none'
+ // if no timeout is set.
+ AlgoBasedRunner(Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline);
+
+ Status Init(std::vector<std::string> master_addresses) override;
+
+ void LoadMoves(std::vector<ReplicaMove> replica_moves) override;
+
+ bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
+
+ bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) override;
+
+ // Rebalancing algorithm that running uses to find replica moves.
+ virtual RebalancingAlgo* algorithm() = 0;
+
+ protected:
+ Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
+
+ // Given the data in the helper containers, find the index describing
+ // the next replica move and output it into the 'op_idx' parameter.
+ bool FindNextMove(size_t* op_idx);
+
+ // Update the helper containers once a move operation has been scheduled.
+ void UpdateOnMoveScheduled(size_t idx,
+ const std::string& tablet_uuid,
+ const std::string& src_ts_uuid,
+ const std::string& dst_ts_uuid,
+ bool is_success);
+
+ // Auxiliary method used by UpdateOnMoveScheduled() implementation.
+ void UpdateOnMoveScheduledImpl(
+ size_t idx,
+ const std::string& ts_uuid,
+ bool is_success,
+ std::unordered_map<std::string, std::set<size_t>>* op_indices);
+
// The moves to schedule.
std::vector<ReplicaMove> replica_moves_;
@@ -217,23 +277,29 @@ class Rebalancer {
// tserver UUID (i.e. the key) as the destination of the move operation'.
std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
- // Mapping 'tserver UUID' --> 'scheduled move operations count'.
- std::unordered_map<std::string, int32_t> op_count_per_ts_;
+ // Random device and generator for selecting among multiple choices, when
+ // appropriate.
+ std::random_device random_device_;
+ std::mt19937 random_generator_;
+ }; // class AlgoBasedRunner
- // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
- // just reversed 'op_count_per_ts_'. Having count as key helps with finding
- // servers with minimum number of scheduled operations while scheduling
- // replica movement operations (it's necessary to preserve the
- // 'maximum-moves-per-server' constraint while doing so).
- std::multimap<int32_t, std::string> ts_per_op_count_;
+ class TwoDimensionalGreedyRunner : public AlgoBasedRunner {
+ public:
+ // The 'max_moves_per_server' specifies the maximum number of operations
+ // per tablet server (both the source and the destination are counted in).
+ // The 'deadline' specifies the deadline for the run, 'boost::none'
+ // if no timeout is set.
+ TwoDimensionalGreedyRunner(Rebalancer* rebalancer,
+ size_t max_moves_per_server,
+ boost::optional<MonoTime> deadline);
- // Information on scheduled replica movement operations; keys are
- // tablet UUIDs, values are ReplicaMove structures.
- MovesInProgress scheduled_moves_;
+ RebalancingAlgo* algorithm() override {
+ return &algorithm_;
+ }
- // Client object to make queries to Kudu masters for various auxiliary info
- // while scheduling move operations and monitoring their status.
- client::sp::shared_ptr<client::KuduClient> client_;
+ private:
+ // An instance of the balancing algorithm.
+ TwoDimensionalGreedyAlgo algorithm_;
};
friend class KsckResultsToClusterBalanceInfoTest;
@@ -246,6 +312,24 @@ class Rebalancer {
const KsckResults& ksck_info,
ClusterRawInfo* raw_info);
+ // Given high-level move-some-tablet-replica-for-a-table information from the
+ // rebalancing algorithm, find appropriate tablet replicas to move between the
+ // specified tablet servers. The set of result tablet UUIDs is output
+ // into the 'tablet_ids' container (note: the container is first cleared).
+ // The source and destination replicas are determined by the elements of the
+ // 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from and
+ // TableReplica::to correspondingly. If no suitable tablet replicas are found,
+ // 'tablet_ids' will be empty with the result status of Status::OK().
+ static Status FindReplicas(const TableReplicaMove& move,
+ const ClusterRawInfo& raw_info,
+ std::vector<std::string>* tablet_ids);
+
+ // Filter move operations in 'replica_moves': remove all operations that would
+ // involve moving replicas of tablets which are in 'scheduled_moves'. The
+ // 'replica_moves' cannot be null.
+ static void FilterMoves(const MovesInProgress& scheduled_moves,
+ std::vector<ReplicaMove>* replica_moves);
+
// Convert the 'raw' information about the cluster into information suitable
// for the input of the high-level rebalancing algorithm.
// The 'moves_in_progress' parameter contains information on the replica moves
@@ -260,37 +344,18 @@ class Rebalancer {
const MovesInProgress& moves_in_progress,
ClusterInfo* info) const;
- // Get next batch of replica moves from the rebalancing algorithm.
- // Essentially, it runs ksck against the cluster and feeds the data into the
- // rebalancing algorithm along with the information on currently pending
- // replica movement operations. The information returned by the high-level
- // rebalancing algorithm is translated into particular replica movement
- // instructions, which are used to populate the 'replica_moves' parameter
- // (the container is cleared first).
- //
- // The 'moves_in_progress' parameter contains information on pending moves.
- // The results are output into 'replica_moves', which will be empty
- // if no next steps are needed to make the cluster balanced.
- Status GetNextMoves(const MovesInProgress& moves_in_progress,
- std::vector<ReplicaMove>* replica_moves);
+ // Run rebalancing using the specified runner.
+ Status RunWith(Runner* runner, RunStatus* result_status);
- // Given information from the high-level rebalancing algorithm, find
- // appropriate tablet replicas to move on the specified tablet servers.
- // The set of result UUIDs is output into the 'tablet_ids' container (note:
- // the output container is first cleared). If no suitable replicas are found,
- // 'tablet_ids' will be empty with the result status of Status::OK().
- Status FindReplicas(const TableReplicaMove& move,
- const KsckResults& ksck_info,
- std::vector<std::string>* tablet_ids) const;
+ // Refresh the information on the cluster (involves running ksck).
+ Status GetClusterRawInfo(ClusterRawInfo* raw_info);
+
+ Status GetNextMoves(Runner* runner,
+ std::vector<ReplicaMove>* replica_moves);
// Reset ksck-related fields and run ksck against the cluster.
Status RefreshKsckResults();
- // Filter out move operations at the tablets which already have operations
- // in progress. The 'replica_moves' cannot be null.
- void FilterMoves(const MovesInProgress& scheduled_moves,
- std::vector<ReplicaMove>* replica_moves);
-
// Configuration for the rebalancer.
const Config config_;