You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2018/10/29 23:09:35 UTC

[4/4] kudu git commit: [rebalancer] location-aware rebalancer (part 4/n)

[rebalancer] location-aware rebalancer (part 4/n)

Refactored Rebalancer and Runner classes, separating common base for
a runner of the rebalancing process.

Change-Id: Id47183fc853573390b22ec714751adec93e0ea3a
Reviewed-on: http://gerrit.cloudera.org:8080/11745
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Will Berkeley <wd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/34bb7f93
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/34bb7f93
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/34bb7f93

Branch: refs/heads/master
Commit: 34bb7f93b4bcb8d39803d0f0718148f84f5cca22
Parents: 43161e5
Author: Alexey Serbin <as...@cloudera.com>
Authored: Fri Oct 19 22:20:33 2018 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Mon Oct 29 23:05:50 2018 +0000

----------------------------------------------------------------------
 src/kudu/tools/rebalancer.cc | 640 ++++++++++++++++++++------------------
 src/kudu/tools/rebalancer.h  | 209 ++++++++-----
 2 files changed, 481 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/34bb7f93/src/kudu/tools/rebalancer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index c3225f2..abf28f0 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -39,6 +39,7 @@
 
 #include "kudu/client/client.h"
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tools/ksck.h"
@@ -93,9 +94,7 @@ Rebalancer::Config::Config(
 }
 
 Rebalancer::Rebalancer(const Config& config)
-    : config_(config),
-      random_device_(),
-      random_generator_(random_device_()) {
+    : config_(config) {
 }
 
 Status Rebalancer::PrintStats(std::ostream& out) {
@@ -109,13 +108,12 @@ Status Rebalancer::PrintStats(std::ostream& out) {
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
 
-  auto& cbi = ci.balance;
   // Per-server replica distribution stats.
   {
     out << "Per-server replica distribution summary:" << endl;
     DataTable summary({"Statistic", "Value"});
 
-    const auto& servers_load_info = cbi.servers_by_total_replica_count;
+    const auto& servers_load_info = ci.balance.servers_by_total_replica_count;
     if (servers_load_info.empty()) {
       summary.AddRow({ "N/A", "N/A" });
     } else {
@@ -159,7 +157,7 @@ Status Rebalancer::PrintStats(std::ostream& out) {
   {
     out << "Per-table replica distribution summary:" << endl;
     DataTable summary({ "Replica Skew", "Value" });
-    const auto& table_skew_info = cbi.table_info_by_skew;
+    const auto& table_skew_info = ci.balance.table_info_by_skew;
     if (table_skew_info.empty()) {
       summary.AddRow({ "N/A", "N/A" });
     } else {
@@ -225,106 +223,145 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
 
-  Runner runner(config_.max_moves_per_server, deadline);
+  TwoDimensionalGreedyRunner runner(this, config_.max_moves_per_server, deadline);
   RETURN_NOT_OK(runner.Init(config_.master_addresses));
+  RETURN_NOT_OK(RunWith(&runner, result_status));
+  if (moves_count != nullptr) {
+    *moves_count = runner.moves_count();
+  }
 
-  const MonoDelta max_staleness_delta =
-      MonoDelta::FromSeconds(config_.max_staleness_interval_sec);
-  MonoTime staleness_start = MonoTime::Now();
-  bool is_timed_out = false;
-  bool resync_state = false;
-  while (!is_timed_out) {
-    if (resync_state) {
-      resync_state = false;
-      MonoDelta staleness_delta = MonoTime::Now() - staleness_start;
-      if (staleness_delta > max_staleness_delta) {
-        LOG(INFO) << Substitute("detected a staleness period of $0", staleness_delta.ToString());
-        return Status::Incomplete(Substitute(
-            "stalled with no progress for more than $0 seconds, aborting",
-            max_staleness_delta.ToString()));
-      }
-      // The actual re-synchronization happens during GetNextMoves() below:
-      // updated info is collected from the cluster and fed into the algorithm.
-      LOG(INFO) << "re-synchronizing cluster state";
-    }
+  return Status::OK();
+}
 
-    {
-      vector<Rebalancer::ReplicaMove> replica_moves;
-      RETURN_NOT_OK(GetNextMoves(runner.scheduled_moves(), &replica_moves));
-      if (replica_moves.empty() && runner.scheduled_moves().empty()) {
-        // No moves are left: done!
-        break;
-      }
+Status Rebalancer::KsckResultsToClusterRawInfo(
+    const KsckResults& ksck_info,
+    ClusterRawInfo* raw_info) {
+  DCHECK(raw_info);
 
-      // Filter out moves for tablets which already have operations in progress.
-      FilterMoves(runner.scheduled_moves(), &replica_moves);
-      runner.LoadMoves(std::move(replica_moves));
-    }
+  raw_info->tserver_summaries = ksck_info.tserver_summaries;
+  raw_info->table_summaries = ksck_info.table_summaries;
+  raw_info->tablet_summaries = ksck_info.tablet_summaries;
 
-    auto has_errors = false;
-    while (!is_timed_out) {
-      auto is_scheduled = runner.ScheduleNextMove(&has_errors, &is_timed_out);
-      resync_state |= has_errors;
-      if (resync_state || is_timed_out) {
-        break;
-      }
-      if (is_scheduled) {
-        // Reset the start of the staleness interval: there was some progress
-        // in scheduling new move operations.
-        staleness_start = MonoTime::Now();
+  return Status::OK();
+}
 
-        // Continue scheduling available move operations while there is enough
-        // capacity, i.e. until number of pending move operations on every
-        // involved tablet server reaches max_moves_per_server. Once no more
-        // operations can be scheduled, it's time to check for their status.
+// Given high-level description of moves, find tablets with replicas at the
+// corresponding tablet servers to satisfy those high-level descriptions.
+// The idea is to find all tablets of the specified table that would have a
+// replica at the source server, but would not have a replica at the destination
+// server. That is to satisfy the restriction of having no more than one replica
+// of the same tablet per server.
+//
+// An additional constraint: it's better not to move leader replicas, if
+// possible. If a client has a write operation in progress, moving leader
+// replicas of affected tablets would make the client to re-resolve new leaders
+// and retry the operations. Moving leader replicas is used as last resort
+// when no other candidates are left.
+Status Rebalancer::FindReplicas(const TableReplicaMove& move,
+                                const ClusterRawInfo& raw_info,
+                                vector<string>* tablet_ids) {
+  const auto& table_id = move.table_id;
+
+  // Tablet ids of replicas on the source tserver that are non-leaders.
+  vector<string> tablet_uuids_src;
+  // Tablet ids of replicas on the source tserver that are leaders.
+  vector<string> tablet_uuids_src_leaders;
+  // UUIDs of tablets of the selected table at the destination tserver.
+  vector<string> tablet_uuids_dst;
+
+  for (const auto& tablet_summary : raw_info.tablet_summaries) {
+    if (tablet_summary.table_id != table_id) {
+      continue;
+    }
+    if (tablet_summary.result != KsckCheckResult::HEALTHY) {
+      VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
+                            "as candidates for movement since the tablet's "
+                            "status is '$2'",
+                            table_id, tablet_summary.id,
+                            KsckCheckResultToString(tablet_summary.result));
+      continue;
+    }
+    for (const auto& replica_summary : tablet_summary.replicas) {
+      if (replica_summary.ts_uuid != move.from &&
+          replica_summary.ts_uuid != move.to) {
         continue;
       }
-
-      // Poll for the status of pending operations. If some of the in-flight
-      // operations are complete, it might be possible to schedule new ones
-      // by calling Runner::ScheduleNextMove().
-      auto has_updates = runner.UpdateMovesInProgressStatus(&has_errors,
-                                                            &is_timed_out);
-      if (has_updates) {
-        // Reset the start of the staleness interval: there was some updates
-        // on the status of scheduled move operations.
-        staleness_start = MonoTime::Now();
+      if (!replica_summary.ts_healthy) {
+        VLOG(1) << Substitute("table $0: not considering replica movement "
+                              "from $1 to $2 since server $3 is not healthy",
+                              table_id,
+                              move.from, move.to, replica_summary.ts_uuid);
+        continue;
       }
-      resync_state |= has_errors;
-      if (resync_state || is_timed_out || !has_updates) {
-        // If there were errors while trying to get the statuses of pending
-        // operations it's necessary to re-synchronize the state of the cluster:
-        // most likely something has changed, so it's better to get a new set
-        // of planned moves.
-        break;
+      if (replica_summary.ts_uuid == move.from) {
+        if (replica_summary.is_leader) {
+          tablet_uuids_src_leaders.emplace_back(tablet_summary.id);
+        } else {
+          tablet_uuids_src.emplace_back(tablet_summary.id);
+        }
+      } else {
+        DCHECK_EQ(move.to, replica_summary.ts_uuid);
+        tablet_uuids_dst.emplace_back(tablet_summary.id);
       }
-
-      // Sleep a bit before going next cycle of status polling.
-      SleepFor(MonoDelta::FromMilliseconds(200));
     }
   }
+  sort(tablet_uuids_src.begin(), tablet_uuids_src.end());
+  sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end());
 
-  *result_status = is_timed_out ? RunStatus::TIMED_OUT
-                                : RunStatus::CLUSTER_IS_BALANCED;
-  if (moves_count) {
-    *moves_count = runner.moves_count();
-  }
+  vector<string> tablet_uuids;
+  set_difference(
+      tablet_uuids_src.begin(), tablet_uuids_src.end(),
+      tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+      inserter(tablet_uuids, tablet_uuids.begin()));
 
-  return Status::OK();
-}
+  if (!tablet_uuids.empty()) {
+    // If there are tablets with non-leader replicas at the source server,
+    // those are the best candidates for movement.
+    tablet_ids->swap(tablet_uuids);
+    return Status::OK();
+  }
 
-Status Rebalancer::KsckResultsToClusterRawInfo(
-    const KsckResults& ksck_info,
-    ClusterRawInfo* raw_info) {
-  DCHECK(raw_info);
+  // If no tablets with non-leader replicas were found, resort to tablets with
+  // leader replicas at the source server.
+  DCHECK(tablet_uuids.empty());
+  sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end());
+  set_difference(
+      tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(),
+      tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
+      inserter(tablet_uuids, tablet_uuids.begin()));
 
-  raw_info->tserver_summaries = ksck_info.tserver_summaries;
-  raw_info->table_summaries = ksck_info.table_summaries;
-  raw_info->tablet_summaries = ksck_info.tablet_summaries;
+  tablet_ids->swap(tablet_uuids);
 
   return Status::OK();
 }
 
+void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
+                             vector<ReplicaMove>* replica_moves) {
+  unordered_set<string> tablet_uuids;
+  vector<ReplicaMove> filtered_replica_moves;
+  for (auto& move_op : *replica_moves) {
+    const auto& tablet_uuid = move_op.tablet_uuid;
+    if (ContainsKey(scheduled_moves, tablet_uuid)) {
+      // There is a move operation in progress for the tablet, don't schedule
+      // another one.
+      continue;
+    }
+    if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) {
+      filtered_replica_moves.emplace_back(std::move(move_op));
+    } else {
+      // Rationale behind the unique tablet constraint: the implementation of
+      // the Run() method is designed to re-order operations suggested by the
+      // high-level algorithm to use the op-count-per-tablet-server capacity
+      // as much as possible. Right now, the RunStep() method outputs only one
+      // move operation per tablet in every batch. The code below is to
+      // enforce the contract between Run() and RunStep() methods.
+      LOG(DFATAL) << "detected multiple replica move operations for the same "
+                     "tablet " << tablet_uuid;
+    }
+  }
+  *replica_moves = std::move(filtered_replica_moves);
+}
+
 Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
                                     const MovesInProgress& moves_in_progress,
                                     ClusterInfo* info) const {
@@ -463,175 +500,88 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
   return Status::OK();
 }
 
-
-// Run one step of the rebalancer. Due to the inherent restrictions of the
-// rebalancing engine, no more than one replica per tablet is moved during
-// one step of the rebalancing.
-Status Rebalancer::GetNextMoves(const MovesInProgress& moves_in_progress,
-                                vector<ReplicaMove>* replica_moves) {
-  RETURN_NOT_OK(RefreshKsckResults());
-  const auto& ksck_info = ksck_->results();
-
-  // For simplicity, allow to run the rebalancing only when all tablet servers
-  // are in good shape. Otherwise, the rebalancing might interfere with the
-  // automatic re-replication or get unexpected errors while moving replicas.
-  for (const auto& s : ksck_info.tserver_summaries) {
-    if (s.health != KsckServerHealth::HEALTHY) {
-      return Status::IllegalState(
-          Substitute("tablet server $0 ($1): unacceptable health status $2",
-                     s.uuid, s.address, ServerHealthToString(s.health)));
+Status Rebalancer::RunWith(Runner* runner, RunStatus* result_status) {
+  const MonoDelta max_staleness_delta =
+      MonoDelta::FromSeconds(config_.max_staleness_interval_sec);
+  MonoTime staleness_start = MonoTime::Now();
+  bool is_timed_out = false;
+  bool resync_state = false;
+  while (!is_timed_out) {
+    if (resync_state) {
+      resync_state = false;
+      MonoDelta staleness_delta = MonoTime::Now() - staleness_start;
+      if (staleness_delta > max_staleness_delta) {
+        LOG(INFO) << Substitute("detected a staleness period of $0",
+                                staleness_delta.ToString());
+        return Status::Incomplete(Substitute(
+            "stalled with no progress for more than $0 seconds, aborting",
+            max_staleness_delta.ToString()));
+      }
+      // The actual re-synchronization happens during GetNextMoves() below:
+      // updated info is collected from the cluster and fed into the algorithm.
+      LOG(INFO) << "re-synchronizing cluster state";
     }
-  }
-
-  // The number of operations to output by the algorithm. Those will be
-  // translated into concrete tablet replica movement operations, the output of
-  // this method.
-  const size_t max_moves = config_.max_moves_per_server *
-      ksck_info.tserver_summaries.size() * 5;
 
-  ClusterRawInfo raw_info;
-  RETURN_NOT_OK(KsckResultsToClusterRawInfo(ksck_info, &raw_info));
+    bool has_more_moves = false;
+    RETURN_NOT_OK(runner->GetNextMoves(&has_more_moves));
+    if (!has_more_moves) {
+      // No moves are left, done!
+      break;
+    }
 
-  replica_moves->clear();
-  vector<TableReplicaMove> moves;
-  ClusterInfo cluster_info;
-  RETURN_NOT_OK(BuildClusterInfo(raw_info, moves_in_progress, &cluster_info));
-  RETURN_NOT_OK(algo_.GetNextMoves(cluster_info, max_moves, &moves));
-  if (moves.empty()) {
-    // No suitable moves were found: the cluster is balanced,
-    // assuming all pending moves, if any, will succeed.
-    return Status::OK();
-  }
-  unordered_set<string> tablets_in_move;
-  std::transform(moves_in_progress.begin(), moves_in_progress.end(),
-                 inserter(tablets_in_move, tablets_in_move.begin()),
-                 [](const MovesInProgress::value_type& elem) {
-                   return elem.first;
-                 });
-  for (const auto& move : moves) {
-    vector<string> tablet_ids;
-    RETURN_NOT_OK(FindReplicas(move, ksck_info, &tablet_ids));
-    // Shuffle the set of the tablet identifiers: that's to achieve even spread
-    // of moves across tables with the same skew.
-    std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
-    string move_tablet_id;
-    for (const auto& tablet_id : tablet_ids) {
-      if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) {
-        // For now, choose the very first tablet that does not have replicas
-        // in move. Later on, additional logic might be added to find
-        // the best candidate.
-        move_tablet_id = tablet_id;
+    auto has_errors = false;
+    while (!is_timed_out) {
+      auto is_scheduled = runner->ScheduleNextMove(&has_errors, &is_timed_out);
+      resync_state |= has_errors;
+      if (resync_state || is_timed_out) {
         break;
       }
-    }
-    if (move_tablet_id.empty()) {
-      LOG(WARNING) << Substitute(
-          "table $0: could not find any suitable replica to move "
-          "from server $1 to server $2", move.table_id, move.from, move.to);
-      continue;
-    }
-    ReplicaMove info;
-    info.tablet_uuid = move_tablet_id;
-    info.ts_uuid_from = move.from;
-    info.ts_uuid_to = move.to;
-    replica_moves->emplace_back(std::move(info));
-    // Mark the tablet as 'has a replica in move'.
-    tablets_in_move.emplace(move_tablet_id);
-  }
-
-  return Status::OK();
-}
-
-// Given high-level description of moves, find tablets with replicas at the
-// corresponding tablet servers to satisfy those high-level descriptions.
-// The idea is to find all tablets of the specified table that would have a
-// replica at the source server, but would not have a replica at the destination
-// server. That is to satisfy the restriction of having no more than one replica
-// of the same tablet per server.
-//
-// An additional constraint: it's better not to move leader replicas, if
-// possible. If a client has a write operation in progress, moving leader
-// replicas of affected tablets would make the client to re-resolve new leaders
-// and retry the operations. Moving leader replicas is used as last resort
-// when no other candidates are left.
-Status Rebalancer::FindReplicas(const TableReplicaMove& move,
-                                const KsckResults& ksck_info,
-                                vector<string>* tablet_ids) const {
-  const auto& table_id = move.table_id;
-
-  // Tablet ids of replicas on the source tserver that are non-leaders.
-  vector<string> tablet_uuids_src;
-  // Tablet ids of replicas on the source tserver that are leaders.
-  vector<string> tablet_uuids_src_leaders;
-  // UUIDs of tablets of the selected table at the destination tserver.
-  vector<string> tablet_uuids_dst;
+      if (is_scheduled) {
+        // Reset the start of the staleness interval: there was some progress
+        // in scheduling new move operations.
+        staleness_start = MonoTime::Now();
 
-  for (const auto& tablet_summary : ksck_info.tablet_summaries) {
-    if (tablet_summary.table_id != table_id) {
-      continue;
-    }
-    if (tablet_summary.result != KsckCheckResult::HEALTHY) {
-      VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 "
-                            "as candidates for movement since the tablet's "
-                            "status is '$2'",
-                            table_id, tablet_summary.id,
-                            KsckCheckResultToString(tablet_summary.result));
-      continue;
-    }
-    for (const auto& replica_summary : tablet_summary.replicas) {
-      if (replica_summary.ts_uuid != move.from &&
-          replica_summary.ts_uuid != move.to) {
+        // Continue scheduling available move operations while there is enough
+        // capacity, i.e. until number of pending move operations on every
+        // involved tablet server reaches max_moves_per_server. Once no more
+        // operations can be scheduled, it's time to check for their status.
         continue;
       }
-      if (!replica_summary.ts_healthy) {
-        VLOG(1) << Substitute("table $0: not considering replica movement "
-                              "from $1 to $2 since server $3 is not healthy",
-                              table_id,
-                              move.from, move.to, replica_summary.ts_uuid);
-        continue;
+
+      // Poll for the status of pending operations. If some of the in-flight
+      // operations are complete, it might be possible to schedule new ones
+      // by calling Runner::ScheduleNextMove().
+      auto has_updates = runner->UpdateMovesInProgressStatus(&has_errors,
+                                                             &is_timed_out);
+      if (has_updates) {
+        // Reset the start of the staleness interval: there was some updates
+        // on the status of scheduled move operations.
+        staleness_start = MonoTime::Now();
       }
-      if (replica_summary.ts_uuid == move.from) {
-        if (replica_summary.is_leader) {
-          tablet_uuids_src_leaders.emplace_back(tablet_summary.id);
-        } else {
-          tablet_uuids_src.emplace_back(tablet_summary.id);
-        }
-      } else {
-        DCHECK_EQ(move.to, replica_summary.ts_uuid);
-        tablet_uuids_dst.emplace_back(tablet_summary.id);
+      resync_state |= has_errors;
+      if (resync_state || is_timed_out || !has_updates) {
+        // If there were errors while trying to get the statuses of pending
+        // operations it's necessary to re-synchronize the state of the cluster:
+        // most likely something has changed, so it's better to get a new set
+        // of planned moves.
+        break;
       }
-    }
-  }
-  sort(tablet_uuids_src.begin(), tablet_uuids_src.end());
-  sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end());
 
-  vector<string> tablet_uuids;
-  set_difference(
-      tablet_uuids_src.begin(), tablet_uuids_src.end(),
-      tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
-      inserter(tablet_uuids, tablet_uuids.begin()));
-
-  if (!tablet_uuids.empty()) {
-    // If there are tablets with non-leader replicas at the source server,
-    // those are the best candidates for movement.
-    tablet_ids->swap(tablet_uuids);
-    return Status::OK();
+      // Sleep a bit before going next cycle of status polling.
+      SleepFor(MonoDelta::FromMilliseconds(200));
+    }
   }
 
-  // If no tablets with non-leader replicas were found, resort to tablets with
-  // leader replicas at the source server.
-  DCHECK(tablet_uuids.empty());
-  sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end());
-  set_difference(
-      tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(),
-      tablet_uuids_dst.begin(), tablet_uuids_dst.end(),
-      inserter(tablet_uuids, tablet_uuids.begin()));
-
-  tablet_ids->swap(tablet_uuids);
-
+  *result_status = is_timed_out ? RunStatus::TIMED_OUT
+                                : RunStatus::CLUSTER_IS_BALANCED;
   return Status::OK();
 }
 
+Status Rebalancer::GetClusterRawInfo(ClusterRawInfo* raw_info) {
+  RETURN_NOT_OK(RefreshKsckResults());
+  return KsckResultsToClusterRawInfo(ksck_->results(), raw_info);
+}
+
 Status Rebalancer::RefreshKsckResults() {
   shared_ptr<KsckCluster> cluster;
   RETURN_NOT_OK_PREPEND(
@@ -643,47 +593,20 @@ Status Rebalancer::RefreshKsckResults() {
   return Status::OK();
 }
 
-void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
-                             vector<ReplicaMove>* replica_moves) {
-  unordered_set<string> tablet_uuids;
-  vector<ReplicaMove> filtered_replica_moves;
-  for (auto&& move_op : *replica_moves) {
-    const auto& tablet_uuid = move_op.tablet_uuid;
-    if (scheduled_moves.find(tablet_uuid) != scheduled_moves.end()) {
-      // There is a move operation in progress for the tablet, don't schedule
-      // another one.
-      continue;
-    }
-    if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) {
-      filtered_replica_moves.push_back(std::move(move_op));
-    } else {
-      // Rationale behind the unique tablet constraint: the implementation of
-      // the Run() method is designed to re-order operations suggested by the
-      // high-level algorithm to use the op-count-per-tablet-server capacity
-      // as much as possible. Right now, the RunStep() method outputs only one
-      // move operation per tablet in every batch. The code below is to
-      // enforce the contract between Run() and RunStep() methods.
-      LOG(DFATAL) << "detected multiple replica move operations for the same "
-                     "tablet " << tablet_uuid;
-    }
-  }
-  replica_moves->swap(filtered_replica_moves);
-}
-
-Rebalancer::Runner::Runner(size_t max_moves_per_server,
-                           const boost::optional<MonoTime>& deadline)
-    : max_moves_per_server_(max_moves_per_server),
-      deadline_(deadline),
+Rebalancer::BaseRunner::BaseRunner(Rebalancer* rebalancer,
+                                   size_t max_moves_per_server,
+                                   boost::optional<MonoTime> deadline)
+    : rebalancer_(rebalancer),
+      max_moves_per_server_(max_moves_per_server),
+      deadline_(std::move(deadline)),
       moves_count_(0) {
+  CHECK(rebalancer_);
 }
 
-Status Rebalancer::Runner::Init(vector<string> master_addresses) {
+Status Rebalancer::BaseRunner::Init(vector<string> master_addresses) {
   DCHECK_EQ(0, moves_count_);
-  DCHECK(src_op_indices_.empty());
-  DCHECK(dst_op_indices_.empty());
   DCHECK(op_count_per_ts_.empty());
   DCHECK(ts_per_op_count_.empty());
-  DCHECK(scheduled_moves_.empty());
   DCHECK(master_addresses_.empty());
   DCHECK(client_.get() == nullptr);
   master_addresses_ = std::move(master_addresses);
@@ -692,7 +615,60 @@ Status Rebalancer::Runner::Init(vector<string> master_addresses) {
       .Build(&client_);
 }
 
-void Rebalancer::Runner::LoadMoves(vector<ReplicaMove> replica_moves) {
+Status Rebalancer::BaseRunner::GetNextMoves(bool* has_moves) {
+  vector<ReplicaMove> replica_moves;
+  RETURN_NOT_OK(GetNextMovesImpl(&replica_moves));
+  if (replica_moves.empty() && scheduled_moves_.empty()) {
+    *has_moves = false;
+    return Status::OK();
+  }
+
+  // Filter out moves for tablets which already have operations in progress.
+  // The idea is simple: no more than one move operation per tablet should
+  // ever be attempted.
+  Rebalancer::FilterMoves(scheduled_moves_, &replica_moves);
+  LoadMoves(std::move(replica_moves));
+
+  // TODO(aserbin): now this method reports on presence of some moves even if
+  //                all of those are in progress and no fresh new are available.
+  //                Would it be more convenient for to report only on the
+  //                fresh new moves and check for the presence of the scheduled
+  //                moves at the upper level?
+  *has_moves = true;
+  return Status::OK();
+}
+
+void Rebalancer::BaseRunner::UpdateOnMoveCompleted(const string& ts_uuid) {
+  const auto op_count = op_count_per_ts_[ts_uuid]--;
+  const auto op_range = ts_per_op_count_.equal_range(op_count);
+  bool ts_per_op_count_updated = false;
+  for (auto it = op_range.first; it != op_range.second; ++it) {
+    if (it->second == ts_uuid) {
+      ts_per_op_count_.erase(it);
+      ts_per_op_count_.emplace(op_count - 1, ts_uuid);
+      ts_per_op_count_updated = true;
+      break;
+    }
+  }
+  DCHECK(ts_per_op_count_updated);
+}
+
+Rebalancer::AlgoBasedRunner::AlgoBasedRunner(
+    Rebalancer* rebalancer,
+    size_t max_moves_per_server,
+    boost::optional<MonoTime> deadline)
+    : BaseRunner(rebalancer, max_moves_per_server, std::move(deadline)),
+      random_generator_(random_device_()) {
+}
+
+Status Rebalancer::AlgoBasedRunner::Init(vector<string> master_addresses) {
+  DCHECK(src_op_indices_.empty());
+  DCHECK(dst_op_indices_.empty());
+  DCHECK(scheduled_moves_.empty());
+  return BaseRunner::Init(std::move(master_addresses));
+}
+
+void Rebalancer::AlgoBasedRunner::LoadMoves(vector<ReplicaMove> replica_moves) {
   // The moves to schedule (used by subsequent calls to ScheduleNextMove()).
   replica_moves_.swap(replica_moves);
 
@@ -758,7 +734,8 @@ void Rebalancer::Runner::LoadMoves(vector<ReplicaMove> replica_moves) {
 }
 
 // Return true if replica move operation has been scheduled successfully.
-bool Rebalancer::Runner::ScheduleNextMove(bool* has_errors, bool* timed_out) {
+bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors,
+                                                   bool* timed_out) {
   DCHECK(has_errors);
   DCHECK(timed_out);
   *has_errors = false;
@@ -811,7 +788,7 @@ bool Rebalancer::Runner::ScheduleNextMove(bool* has_errors, bool* timed_out) {
   return false;
 }
 
-bool Rebalancer::Runner::UpdateMovesInProgressStatus(
+bool Rebalancer::AlgoBasedRunner::UpdateMovesInProgressStatus(
     bool* has_errors, bool* timed_out) {
   DCHECK(has_errors);
   DCHECK(timed_out);
@@ -848,7 +825,9 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus(
       // Erase the element and advance the iterator.
       it = scheduled_moves_.erase(it);
       continue;
-    } else if (is_complete) {
+    }
+    DCHECK(s.ok());
+    if (is_complete) {
       // The move has completed (success or failure): update the stats on the
       // pending operations per server.
       ++moves_count_;
@@ -856,7 +835,7 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus(
       UpdateOnMoveCompleted(it->second.ts_uuid_to);
       LOG(INFO) << Substitute("tablet $0: $1 -> $2 move completed: $3",
                               tablet_id, src_ts_uuid, dst_ts_uuid,
-                              s.ok() ? move_status.ToString() : s.ToString());
+                              move_status.ToString());
       // Erase the element and advance the iterator.
       it = scheduled_moves_.erase(it);
       continue;
@@ -869,7 +848,83 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus(
   return has_updates;
 }
 
-bool Rebalancer::Runner::FindNextMove(size_t* op_idx) {
+// Run one step of the rebalancer. Due to the inherent restrictions of the
+// rebalancing engine, no more than one replica per tablet is moved during
+// one step of the rebalancing.
+Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
+    vector<ReplicaMove>* replica_moves) {
+  ClusterRawInfo raw_info;
+  RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(&raw_info));
+
+  // For simplicity, allow to run the rebalancing only when all tablet servers
+  // are in good shape. Otherwise, the rebalancing might interfere with the
+  // automatic re-replication or get unexpected errors while moving replicas.
+  for (const auto& s : raw_info.tserver_summaries) {
+    if (s.health != KsckServerHealth::HEALTHY) {
+      return Status::IllegalState(
+          Substitute("tablet server $0 ($1): unacceptable health status $2",
+                     s.uuid, s.address, ServerHealthToString(s.health)));
+    }
+  }
+
+  // The number of operations to output by the algorithm. Those will be
+  // translated into concrete tablet replica movement operations, the output of
+  // this method.
+  const size_t max_moves = max_moves_per_server_ *
+      raw_info.tserver_summaries.size() * 5;
+
+  replica_moves->clear();
+  vector<TableReplicaMove> moves;
+  ClusterInfo cluster_info;
+  RETURN_NOT_OK(rebalancer_->BuildClusterInfo(
+      raw_info, scheduled_moves_, &cluster_info));
+  RETURN_NOT_OK(algorithm()->GetNextMoves(cluster_info, max_moves, &moves));
+  if (moves.empty()) {
+    // No suitable moves were found: the cluster described by the 'cluster_info'
+    // is balanced, assuming the pending moves, if any, will succeed.
+    return Status::OK();
+  }
+  unordered_set<string> tablets_in_move;
+  std::transform(scheduled_moves_.begin(), scheduled_moves_.end(),
+                 inserter(tablets_in_move, tablets_in_move.begin()),
+                 [](const MovesInProgress::value_type& elem) {
+                   return elem.first;
+                 });
+  for (const auto& move : moves) {
+    vector<string> tablet_ids;
+    RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));
+    // Shuffle the set of the tablet identifiers: that's to achieve even spread
+    // of moves across tables with the same skew.
+    std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
+    string move_tablet_id;
+    for (const auto& tablet_id : tablet_ids) {
+      if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) {
+        // For now, choose the very first tablet that does not have replicas
+        // in move. Later on, additional logic might be added to find
+        // the best candidate.
+        move_tablet_id = tablet_id;
+        break;
+      }
+    }
+    if (move_tablet_id.empty()) {
+      LOG(WARNING) << Substitute(
+          "table $0: could not find any suitable replica to move "
+          "from server $1 to server $2", move.table_id, move.from, move.to);
+      continue;
+    }
+    ReplicaMove info;
+    info.tablet_uuid = move_tablet_id;
+    info.ts_uuid_from = move.from;
+    info.ts_uuid_to = move.to;
+    replica_moves->emplace_back(std::move(info));
+    // Mark the tablet as 'has a replica in move'.
+    tablets_in_move.emplace(move_tablet_id);
+  }
+
+  return Status::OK();
+}
+
+bool Rebalancer::AlgoBasedRunner::FindNextMove(size_t* op_idx) {
   vector<size_t> op_indices;
   for (auto it = ts_per_op_count_.begin(); op_indices.empty() &&
        it != ts_per_op_count_.end() && it->first < max_moves_per_server_; ++it) {
@@ -915,7 +970,7 @@ bool Rebalancer::Runner::FindNextMove(size_t* op_idx) {
   return !op_indices.empty();
 }
 
-void Rebalancer::Runner::UpdateOnMoveScheduled(
+void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduled(
     size_t idx,
     const string& tablet_uuid,
     const string& src_ts_uuid,
@@ -925,13 +980,14 @@ void Rebalancer::Runner::UpdateOnMoveScheduled(
     Rebalancer::ReplicaMove move_info = { tablet_uuid, src_ts_uuid, dst_ts_uuid };
     auto ins = scheduled_moves_.emplace(tablet_uuid, std::move(move_info));
     // Only one replica of a tablet can be moved at a time.
-    DCHECK(ins.second);
+    // TODO(aserbin): clarify on duplicates
+    //DCHECK(ins.second);
   }
   UpdateOnMoveScheduledImpl(idx, src_ts_uuid, is_success, &src_op_indices_);
   UpdateOnMoveScheduledImpl(idx, dst_ts_uuid, is_success, &dst_op_indices_);
 }
 
-void Rebalancer::Runner::UpdateOnMoveScheduledImpl(
+void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
     size_t idx,
     const string& ts_uuid,
     bool is_success,
@@ -959,19 +1015,11 @@ void Rebalancer::Runner::UpdateOnMoveScheduledImpl(
   }
 }
 
-void Rebalancer::Runner::UpdateOnMoveCompleted(const string& ts_uuid) {
-  const auto op_count = op_count_per_ts_[ts_uuid]--;
-  const auto op_range = ts_per_op_count_.equal_range(op_count);
-  bool ts_per_op_count_updated = false;
-  for (auto it = op_range.first; it != op_range.second; ++it) {
-    if (it->second == ts_uuid) {
-      ts_per_op_count_.erase(it);
-      ts_per_op_count_.emplace(op_count - 1, ts_uuid);
-      ts_per_op_count_updated = true;
-      break;
-    }
-  }
-  DCHECK(ts_per_op_count_updated);
+Rebalancer::TwoDimensionalGreedyRunner::TwoDimensionalGreedyRunner(
+    Rebalancer* rebalancer,
+    size_t max_moves_per_server,
+    boost::optional<MonoTime> deadline)
+    : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)) {
 }
 
 } // namespace tools

http://git-wip-us.apache.org/repos/asf/kudu/blob/34bb7f93/src/kudu/tools/rebalancer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index 2b13a92..c4f5824 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -131,26 +131,21 @@ class Rebalancer {
   // and track already scheduled ones.
   class Runner {
    public:
-    // The 'max_moves_per_server' specifies the maximum number of operations
-    // per tablet server (both the source and the destination are counted in).
-    // The 'deadline' specifies the deadline for the run, 'boost::none'
-    // if no timeout is set.
-    Runner(size_t max_moves_per_server,
-           const boost::optional<MonoTime>& deadline);
+    virtual ~Runner() = default;
 
     // Initialize instance of Runner so it can run against Kudu cluster with
     // the 'master_addresses' RPC endpoints.
-    Status Init(std::vector<std::string> master_addresses);
+    virtual Status Init(std::vector<std::string> master_addresses) = 0;
 
     // Load information on prescribed replica movement operations. Also,
     // populate helper containers and other auxiliary run-time structures
     // used by ScheduleNextMove(). This method is called with every batch
     // of move operations output by the rebalancing algorithm once previously
     // loaded moves have been scheduled.
-    void LoadMoves(std::vector<ReplicaMove> replica_moves);
+    virtual void LoadMoves(std::vector<ReplicaMove> replica_moves) = 0;
 
     // Schedule next replica move.
-    bool ScheduleNextMove(bool* has_errors, bool* timed_out);
+    virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0;
 
     // Update statuses and auxiliary information on in-progress replica move
     // operations. The 'timed_out' parameter is set to 'true' if not all
@@ -158,39 +153,45 @@ class Rebalancer {
     // the 'deadline_' member field. The method returns 'true' if it's necessary
     // to clear the state of the in-progress operations, i.e. 'forget'
     // those, starting from a clean state.
-    bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out);
+    virtual bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) = 0;
 
-    uint32_t moves_count() const {
-      return moves_count_;
-    }
+    virtual Status GetNextMoves(bool* has_moves) = 0;
 
-    const MovesInProgress& scheduled_moves() const {
-      return scheduled_moves_;
-    }
+    virtual uint32_t moves_count() const = 0;
+  }; // class Runner
 
-   private:
-    // Given the data in the helper containers, find the index describing
-    // the next replica move and output it into the 'op_idx' parameter.
-    bool FindNextMove(size_t* op_idx);
+  // Common base for a few Runner implementations.
+  class BaseRunner : public Runner {
+   public:
+    BaseRunner(Rebalancer* rebalancer,
+               size_t max_moves_per_server,
+               boost::optional<MonoTime> deadline);
 
-    // Update the helper containers once a move operation has been scheduled.
-    void UpdateOnMoveScheduled(size_t idx,
-                               const std::string& tablet_uuid,
-                               const std::string& src_ts_uuid,
-                               const std::string& dst_ts_uuid,
-                               bool is_success);
+    Status Init(std::vector<std::string> master_addresses) override;
 
-    // Auxiliary method used by UpdateOnMoveScheduled() implementation.
-    void UpdateOnMoveScheduledImpl(
-        size_t idx,
-        const std::string& ts_uuid,
-        bool is_success,
-        std::unordered_map<std::string, std::set<size_t>>* op_indices);
+    Status GetNextMoves(bool* has_moves) override;
+
+    uint32_t moves_count() const override {
+      return moves_count_;
+    }
+
+   protected:
+    // Get next batch of replica moves from the rebalancing algorithm.
+    // Essentially, it runs ksck against the cluster and feeds the data into the
+    // rebalancing algorithm along with the information on currently pending
+    // replica movement operations. The information returned by the high-level
+    // rebalancing algorithm is translated into particular replica movement
+    // instructions, which are used to populate the 'replica_moves' parameter
+    // (the container is cleared first).
+    virtual Status GetNextMovesImpl(std::vector<ReplicaMove>* moves) = 0;
 
     // Update the helper containers once a scheduled operation is complete
     // (i.e. succeeded or failed).
     void UpdateOnMoveCompleted(const std::string& ts_uuid);
 
+    // A pointer to the Rebalancer object.
+    Rebalancer* rebalancer_;
+
     // Maximum allowed number of move operations per server. For a move
     // operation, a source replica adds +1 at the source server and the target
     // replica adds +1 at the destination server.
@@ -200,12 +201,71 @@ class Rebalancer {
     // ScheduleNextMoves() and UpadteMovesInProgressStatus() methods.
     const boost::optional<MonoTime> deadline_;
 
+    // Client object to make queries to Kudu masters for various auxiliary info
+    // while scheduling move operations and monitoring their status.
+    client::sp::shared_ptr<client::KuduClient> client_;
+
+    // Information on scheduled replica movement operations; keys are
+    // tablet UUIDs, values are ReplicaMove structures.
+    MovesInProgress scheduled_moves_;
+
     // Number of successfully completed replica moves operations.
     uint32_t moves_count_;
 
     // Kudu cluster RPC end-points.
     std::vector<std::string> master_addresses_;
 
+    // Mapping 'tserver UUID' --> 'scheduled move operations count'.
+    std::unordered_map<std::string, int32_t> op_count_per_ts_;
+
+    // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
+    // just reversed 'op_count_per_ts_'.
+    std::multimap<int32_t, std::string> ts_per_op_count_;
+  }; // class BaseRunner
+
+  // Runner that leverages RebalancingAlgo interface for rebalancing.
+  class AlgoBasedRunner : public BaseRunner {
+   public:
+    // The 'max_moves_per_server' specifies the maximum number of operations
+    // per tablet server (both the source and the destination are counted in).
+    // The 'deadline' specifies the deadline for the run, 'boost::none'
+    // if no timeout is set.
+    AlgoBasedRunner(Rebalancer* rebalancer,
+                    size_t max_moves_per_server,
+                    boost::optional<MonoTime> deadline);
+
+    Status Init(std::vector<std::string> master_addresses) override;
+
+    void LoadMoves(std::vector<ReplicaMove> replica_moves) override;
+
+    bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;
+
+    bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) override;
+
+    // Rebalancing algorithm that running uses to find replica moves.
+    virtual RebalancingAlgo* algorithm() = 0;
+
+   protected:
+    Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override;
+
+    // Given the data in the helper containers, find the index describing
+    // the next replica move and output it into the 'op_idx' parameter.
+    bool FindNextMove(size_t* op_idx);
+
+    // Update the helper containers once a move operation has been scheduled.
+    void UpdateOnMoveScheduled(size_t idx,
+                               const std::string& tablet_uuid,
+                               const std::string& src_ts_uuid,
+                               const std::string& dst_ts_uuid,
+                               bool is_success);
+
+    // Auxiliary method used by UpdateOnMoveScheduled() implementation.
+    void UpdateOnMoveScheduledImpl(
+        size_t idx,
+        const std::string& ts_uuid,
+        bool is_success,
+        std::unordered_map<std::string, std::set<size_t>>* op_indices);
+
     // The moves to schedule.
     std::vector<ReplicaMove> replica_moves_;
 
@@ -217,23 +277,29 @@ class Rebalancer {
     // tserver UUID (i.e. the key) as the destination of the move operation'.
     std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;
 
-    // Mapping 'tserver UUID' --> 'scheduled move operations count'.
-    std::unordered_map<std::string, int32_t> op_count_per_ts_;
+    // Random device and generator for selecting among multiple choices, when
+    // appropriate.
+    std::random_device random_device_;
+    std::mt19937 random_generator_;
+  }; // class AlgoBasedRunner
 
-    // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
-    // just reversed 'op_count_per_ts_'. Having count as key helps with finding
-    // servers with minimum number of scheduled operations while scheduling
-    // replica movement operations (it's necessary to preserve the
-    // 'maximum-moves-per-server' constraint while doing so).
-    std::multimap<int32_t, std::string> ts_per_op_count_;
+  class TwoDimensionalGreedyRunner : public AlgoBasedRunner {
+   public:
+    // The 'max_moves_per_server' specifies the maximum number of operations
+    // per tablet server (both the source and the destination are counted in).
+    // The 'deadline' specifies the deadline for the run, 'boost::none'
+    // if no timeout is set.
+    TwoDimensionalGreedyRunner(Rebalancer* rebalancer,
+                               size_t max_moves_per_server,
+                               boost::optional<MonoTime> deadline);
 
-    // Information on scheduled replica movement operations; keys are
-    // tablet UUIDs, values are ReplicaMove structures.
-    MovesInProgress scheduled_moves_;
+    RebalancingAlgo* algorithm() override {
+      return &algorithm_;
+    }
 
-    // Client object to make queries to Kudu masters for various auxiliary info
-    // while scheduling move operations and monitoring their status.
-    client::sp::shared_ptr<client::KuduClient> client_;
+   private:
+    // An instance of the balancing algorithm.
+    TwoDimensionalGreedyAlgo algorithm_;
   };
 
   friend class KsckResultsToClusterBalanceInfoTest;
@@ -246,6 +312,24 @@ class Rebalancer {
       const KsckResults& ksck_info,
       ClusterRawInfo* raw_info);
 
+  // Given high-level move-some-tablet-replica-for-a-table information from the
+  // rebalancing algorithm, find appropriate tablet replicas to move between the
+  // specified tablet servers. The set of result tablet UUIDs is output
+  // into the 'tablet_ids' container (note: the container is first cleared).
+  // The source and destination replicas are determined by the elements of the
+  // 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from and
+  // TableReplica::to correspondingly. If no suitable tablet replicas are found,
+  // 'tablet_ids' will be empty with the result status of Status::OK().
+  static Status FindReplicas(const TableReplicaMove& move,
+                             const ClusterRawInfo& raw_info,
+                             std::vector<std::string>* tablet_ids);
+
+  // Filter move operations in 'replica_moves': remove all operations that would
+  // involve moving replicas of tablets which are in 'scheduled_moves'. The
+  // 'replica_moves' cannot be null.
+  static void FilterMoves(const MovesInProgress& scheduled_moves,
+                          std::vector<ReplicaMove>* replica_moves);
+
   // Convert the 'raw' information about the cluster into information suitable
   // for the input of the high-level rebalancing algorithm.
   // The 'moves_in_progress' parameter contains information on the replica moves
@@ -260,37 +344,18 @@ class Rebalancer {
                           const MovesInProgress& moves_in_progress,
                           ClusterInfo* info) const;
 
-  // Get next batch of replica moves from the rebalancing algorithm.
-  // Essentially, it runs ksck against the cluster and feeds the data into the
-  // rebalancing algorithm along with the information on currently pending
-  // replica movement operations. The information returned by the high-level
-  // rebalancing algorithm is translated into particular replica movement
-  // instructions, which are used to populate the 'replica_moves' parameter
-  // (the container is cleared first).
-  //
-  // The 'moves_in_progress' parameter contains information on pending moves.
-  // The results are output into 'replica_moves', which will be empty
-  // if no next steps are needed to make the cluster balanced.
-  Status GetNextMoves(const MovesInProgress& moves_in_progress,
-                      std::vector<ReplicaMove>* replica_moves);
+  // Run rebalancing using the specified runner.
+  Status RunWith(Runner* runner, RunStatus* result_status);
 
-  // Given information from the high-level rebalancing algorithm, find
-  // appropriate tablet replicas to move on the specified tablet servers.
-  // The set of result UUIDs is output into the 'tablet_ids' container (note:
-  // the output container is first cleared). If no suitable replicas are found,
-  // 'tablet_ids' will be empty with the result status of Status::OK().
-  Status FindReplicas(const TableReplicaMove& move,
-                      const KsckResults& ksck_info,
-                      std::vector<std::string>* tablet_ids) const;
+  // Refresh the information on the cluster (involves running ksck).
+  Status GetClusterRawInfo(ClusterRawInfo* raw_info);
+
+  Status GetNextMoves(Runner* runner,
+                      std::vector<ReplicaMove>* replica_moves);
 
   // Reset ksck-related fields and run ksck against the cluster.
   Status RefreshKsckResults();
 
-  // Filter out move operations at the tablets which already have operations
-  // in progress. The 'replica_moves' cannot be null.
-  void FilterMoves(const MovesInProgress& scheduled_moves,
-                   std::vector<ReplicaMove>* replica_moves);
-
   // Configuration for the rebalancer.
   const Config config_;