You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/11/21 03:17:36 UTC

[kudu] 02/02: KUDU-2914: Rebalance tool support moving replicas from some specific tablet servers

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

View the commit online:
https://github.com/apache/kudu/commit/9b7669f6ac5b615d172557aa438ed2518fbee3f0

commit 9b7669f6ac5b615d172557aa438ed2518fbee3f0
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Wed Aug 28 14:43:36 2019 +0800

    KUDU-2914: Rebalance tool support moving replicas from some specific tablet servers
    
    Aims to support moving replicas from specific tablet servers,
    this patch re-uses the '--ignored_tservers' flag and adds a
    '--move_replicas_from_ignored_tservers' flag to the
    `kudu rebalance cluster` CLI tool.
    
    Once the flag '--ignored_tservers' is specified, the given
    tablet servers are not considered as a part of the cluster,
    both their health state and replicas on them are ignored
    by the rebalancer tool.
    
    While if '--move_replicas_from_ignored_tservers' is enabled,
    replicas on healthy ignored tservers would be moved to other
    tservers first, and then running the rebalancing on the other
    healthy tservers in the cluster.
    
    Additionally, if we want to move replicas from some specified
    tablet servers to other servers, the specified tablet servers
    should be set into maintenance_mode first, otherwise the
    rebalancer tool would not run.
    
    Change-Id: I86cfb740030946c13db1a9ca63d241f4907d6c89
    Reviewed-on: http://gerrit.cloudera.org:8080/14154
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/rebalance/rebalance-test.cc   | 149 ++++++++++++++---
 src/kudu/rebalance/rebalance_algo.h    |   8 +
 src/kudu/rebalance/rebalancer.cc       |  41 ++++-
 src/kudu/rebalance/rebalancer.h        |  25 ++-
 src/kudu/tools/rebalancer_tool-test.cc | 274 +++++++++++++++++++++++++++++-
 src/kudu/tools/rebalancer_tool.cc      | 294 +++++++++++++++++++++++++++------
 src/kudu/tools/rebalancer_tool.h       |  93 ++++++++++-
 src/kudu/tools/tool_action_cluster.cc  |  19 ++-
 8 files changed, 801 insertions(+), 102 deletions(-)

diff --git a/src/kudu/rebalance/rebalance-test.cc b/src/kudu/rebalance/rebalance-test.cc
index a46c454..76afde6 100644
--- a/src/kudu/rebalance/rebalance-test.cc
+++ b/src/kudu/rebalance/rebalance-test.cc
@@ -18,14 +18,15 @@
 #include <algorithm>
 #include <cstdint>
 #include <iostream>
-#include <iterator>
 #include <map>
 #include <string>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rebalance/cluster_status.h"
 #include "kudu/rebalance/rebalance_algo.h"
@@ -37,11 +38,10 @@ using kudu::cluster_summary::ServerHealthSummary;
 using kudu::cluster_summary::TableSummary;
 using kudu::cluster_summary::TabletSummary;
 
-using std::inserter;
+using std::multimap;
 using std::ostream;
 using std::sort;
 using std::string;
-using std::transform;
 using std::vector;
 using strings::Substitute;
 
@@ -129,17 +129,17 @@ ClusterRawInfo GenerateRawClusterInfo(const KsckResultsInput& input) {
 // of insertion and does not change. Since the insertion order is not
 // important for the comparison with the reference results, this comparison
 // operator normalizes both the 'lhs' and 'rhs', so the comparison operator
-// compares only the contents of the 'servers_by_replica_count', not the order
-// of the elements.
-bool HasSameContents(const ServersByCountMap& lhs,
-                     const ServersByCountMap& rhs) {
+// compares only the contents of the 'servers_by_replica_count' and
+// 'table_info_by_skew, not the order of the elements.
+template<typename T>
+bool HasSameContents(const multimap<int32_t, T>& lhs, const multimap<int32_t, T>& rhs) {
   if (lhs.size() != rhs.size()) {
     return false;
   }
 
   auto it_lhs = lhs.begin();
   auto it_rhs = rhs.begin();
-  for (; it_lhs != lhs.end() && it_rhs != rhs.end(); ) {
+  while (it_lhs != lhs.end() && it_rhs != rhs.end()) {
     auto key_lhs = it_lhs->first;
     auto key_rhs = it_rhs->first;
     if (key_lhs != key_rhs) {
@@ -149,23 +149,19 @@ bool HasSameContents(const ServersByCountMap& lhs,
     auto eq_range_lhs = lhs.equal_range(key_lhs);
     auto eq_range_rhs = rhs.equal_range(key_rhs);
 
-    vector<string> lhs_values;
+    vector<T> lhs_values;
     {
-      transform(eq_range_lhs.first, eq_range_lhs.second,
-                inserter(lhs_values, lhs_values.begin()),
-                [](const ServersByCountMap::value_type& elem) {
-                  return elem.second;
-                });
+      for (auto it = eq_range_lhs.first; it != eq_range_lhs.second; ++it) {
+        lhs_values.push_back(it->second);
+      }
       sort(lhs_values.begin(), lhs_values.end());
     }
 
-    vector<string> rhs_values;
+    vector<T> rhs_values;
     {
-      transform(eq_range_rhs.first, eq_range_rhs.second,
-                inserter(rhs_values, rhs_values.begin()),
-                [](const ServersByCountMap::value_type& elem) {
-                  return elem.second;
-                });
+      for (auto it = eq_range_rhs.first; it != eq_range_rhs.second; ++it) {
+        rhs_values.push_back(it->second);
+      }
       sort(rhs_values.begin(), rhs_values.end());
     }
 
@@ -184,13 +180,19 @@ bool HasSameContents(const ServersByCountMap& lhs,
 } // anonymous namespace
 
 bool operator==(const TableBalanceInfo& lhs, const TableBalanceInfo& rhs) {
-  return HasSameContents(lhs.servers_by_replica_count,
-                         rhs.servers_by_replica_count);
+  return
+    lhs.table_id == rhs.table_id &&
+    HasSameContents(lhs.servers_by_replica_count,
+                    rhs.servers_by_replica_count);
+}
+
+bool operator<(const TableBalanceInfo& lhs, const TableBalanceInfo& rhs) {
+  return lhs.table_id < rhs.table_id;
 }
 
 bool operator==(const ClusterBalanceInfo& lhs, const ClusterBalanceInfo& rhs) {
   return
-      lhs.table_info_by_skew == rhs.table_info_by_skew &&
+      HasSameContents(lhs.table_info_by_skew, rhs.table_info_by_skew) &&
       HasSameContents(lhs.servers_by_total_replica_count,
                       rhs.servers_by_total_replica_count);
 }
@@ -228,7 +230,26 @@ class KsckResultsToClusterBalanceInfoTest : public ::testing::Test {
           raw_info, Rebalancer::MovesInProgress(), &ci));
 
       ASSERT_EQ(cfg.ref_balance_info, ci.balance);
+      // Make sure that ClusterInfo::balance dosen't contain any ignored tserver,
+      // so no replica would be moved to ignored tservers.
+      ASSERT_FALSE(ContainsOneOfIgnoredTservers(ci.balance.servers_by_total_replica_count,
+                                                rebalancer_cfg.ignored_tservers));
+      for (const auto& elem : ci.balance.table_info_by_skew) {
+        SCOPED_TRACE(Substitute("check table $0 ", elem.second.table_id));
+        ASSERT_FALSE(ContainsOneOfIgnoredTservers(elem.second.servers_by_replica_count,
+                                                  rebalancer_cfg.ignored_tservers));
+      }
+    }
+  }
+ private:
+  bool ContainsOneOfIgnoredTservers(const ServersByCountMap& map,
+                                    const std::unordered_set<string>& ignored_tservers) {
+    for (const auto& elem : map) {
+      if (ContainsKey(ignored_tservers, elem.second)) {
+        return true;
+      }
     }
+    return false;
   }
 };
 
@@ -242,6 +263,7 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
     5,      // max_moves_per_server
     30,     // max_staleness_interval_sec
     0,      // max_run_time_sec
+    false,  // move_replicas_from_ignored_tservers
     true,   // move_rf1_replicas
   };
 
@@ -379,6 +401,7 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
     5,      // max_moves_per_server
     30,     // max_staleness_interval_sec
     0,      // max_run_time_sec
+    false,  // move_replicas_from_ignored_tservers
     false,  // move_rf1_replicas
   };
 
@@ -400,7 +423,7 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
         { { 0, "ts_0" }, }
       }
     },
-    // Two tserver, two tables, RF=1.
+    // Two tservers, two tables, RF=1.
     {
       {
         { { "ts_0" }, { "ts_1" }, },
@@ -454,5 +477,83 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
   NO_FATALS(RunTest(rebalancer_config, test_configs));
 }
 
+// Test converting KsckResults result into ClusterBalanceInfo if moving
+// replicas from specified 'ignored_tservers'.
+TEST_F(KsckResultsToClusterBalanceInfoTest, MoveIgnoredTserversReplicas) {
+  const Rebalancer::Config rebalancer_config = {
+    { "ts_0", "ts_1" },     // ignored_tservers
+    {},     // master_addresses
+    {},     // table_filters
+    5,      // max_moves_per_server
+    30,     // max_staleness_interval_sec
+    0,      // max_run_time_sec
+    true,   // move_replicas_from_ignored_tservers
+    false,  // move_rf1_replicas
+  };
+
+  const vector<KsckResultsTestConfig> test_configs = {
+    // five tservers, three tables
+    // table_a: 1 tablet with RF=3
+    // table_b: 2 tablets with RF=3
+    // table_c: 3 tablets with RF=3
+    {
+      {
+        { { "ts_0" }, { "ts_1" }, { "ts_2" }, { "ts_3" }, { "ts_4" }, },
+        {
+          { "tablet_a_0", "table_a", { { "ts_0", true }, }, },
+          { "tablet_a_0", "table_a", { { "ts_1", true }, }, },
+          { "tablet_a_0", "table_a", { { "ts_2", true }, }, },
+          { "tablet_b_0", "table_b", { { "ts_1", true }, }, },
+          { "tablet_b_0", "table_b", { { "ts_2", true }, }, },
+          { "tablet_b_0", "table_b", { { "ts_3", true }, }, },
+          { "tablet_b_1", "table_b", { { "ts_2", true }, }, },
+          { "tablet_b_1", "table_b", { { "ts_3", true }, }, },
+          { "tablet_b_1", "table_b", { { "ts_4", true }, }, },
+          { "tablet_c_0", "table_c", { { "ts_1", true }, }, },
+          { "tablet_c_0", "table_c", { { "ts_2", true }, }, },
+          { "tablet_c_0", "table_c", { { "ts_3", true }, }, },
+          { "tablet_c_1", "table_c", { { "ts_1", true }, }, },
+          { "tablet_c_1", "table_c", { { "ts_2", true }, }, },
+          { "tablet_c_1", "table_c", { { "ts_3", true }, }, },
+          { "tablet_c_2", "table_c", { { "ts_2", true }, }, },
+          { "tablet_c_2", "table_c", { { "ts_3", true }, }, },
+          { "tablet_c_2", "table_c", { { "ts_4", true }, }, },
+        },
+        { { { "table_a", 3 }, { "table_b", 3 }, { "table_c", 3 }, } },
+      },
+      {
+        {
+          {
+            1, {
+              "table_a", {
+                { 0, "ts_4" }, { 0, "ts_3" }, { 1, "ts_2" },
+              }
+            }
+          },
+          {
+            1, {
+              "table_b", {
+                { 1, "ts_4" }, { 2, "ts_3" }, { 2, "ts_2" },
+              }
+            }
+          },
+          {
+            2, {
+              "table_c", {
+                { 1, "ts_4" }, { 3, "ts_3" }, { 3, "ts_2" },
+              }
+            }
+          },
+        },
+        {
+          { 2, "ts_4" }, { 5, "ts_3" }, { 6, "ts_2" },
+        },
+      }
+    },
+  };
+
+  NO_FATALS(RunTest(rebalancer_config, test_configs));
+}
+
 } // namespace rebalance
 } // namespace kudu
diff --git a/src/kudu/rebalance/rebalance_algo.h b/src/kudu/rebalance/rebalance_algo.h
index f5b790a..b827f0a 100644
--- a/src/kudu/rebalance/rebalance_algo.h
+++ b/src/kudu/rebalance/rebalance_algo.h
@@ -82,8 +82,16 @@ struct ClusterLocalityInfo {
 
 // Information on a cluster as input for various rebalancing algorithms.
 struct ClusterInfo {
+  // Balance information for a cluster,
+  // excluding ignored tablet servers and replicas on them.
   ClusterBalanceInfo balance;
+
+  // Locality information for a cluster.
   ClusterLocalityInfo locality;
+
+  // Mapping tserver identifier --> total replica count on the server.
+  // Replicas on these tablet servers need to move to other tservers in the cluster.
+  std::unordered_map<std::string, int> tservers_to_empty;
 };
 
 // A directive to move some replica of a table between two tablet servers.
diff --git a/src/kudu/rebalance/rebalancer.cc b/src/kudu/rebalance/rebalancer.cc
index b3c1c6c..84ee672 100644
--- a/src/kudu/rebalance/rebalancer.cc
+++ b/src/kudu/rebalance/rebalancer.cc
@@ -64,6 +64,7 @@ Rebalancer::Config::Config(
     size_t max_moves_per_server,
     size_t max_staleness_interval_sec,
     int64_t max_run_time_sec,
+    bool move_replicas_from_ignored_tservers,
     bool move_rf1_replicas,
     bool output_replica_distribution_details,
     bool run_policy_fixer,
@@ -76,6 +77,7 @@ Rebalancer::Config::Config(
       max_moves_per_server(max_moves_per_server),
       max_staleness_interval_sec(max_staleness_interval_sec),
       max_run_time_sec(max_run_time_sec),
+      move_replicas_from_ignored_tservers(move_replicas_from_ignored_tservers),
       move_rf1_replicas(move_rf1_replicas),
       output_replica_distribution_details(output_replica_distribution_details),
       run_policy_fixer(run_policy_fixer),
@@ -101,9 +103,9 @@ Rebalancer::Rebalancer(Config config)
 // replicas of affected tablets would make the client to re-resolve new leaders
 // and retry the operations. Moving leader replicas is used as last resort
 // when no other candidates are left.
-Status Rebalancer::FindReplicas(const TableReplicaMove& move,
-                                const ClusterRawInfo& raw_info,
-                                vector<string>* tablet_ids) {
+void Rebalancer::FindReplicas(const TableReplicaMove& move,
+                              const ClusterRawInfo& raw_info,
+                              vector<string>* tablet_ids) {
   const auto& table_id = move.table_id;
 
   // Tablet ids of replicas on the source tserver that are non-leaders.
@@ -162,7 +164,7 @@ Status Rebalancer::FindReplicas(const TableReplicaMove& move,
     // If there are tablets with non-leader replicas at the source server,
     // those are the best candidates for movement.
     tablet_ids->swap(tablet_uuids);
-    return Status::OK();
+    return;
   }
 
   // If no tablets with non-leader replicas were found, resort to tablets with
@@ -175,8 +177,6 @@ Status Rebalancer::FindReplicas(const TableReplicaMove& move,
       inserter(tablet_uuids, tablet_uuids.begin()));
 
   tablet_ids->swap(tablet_uuids);
-
-  return Status::OK();
 }
 
 void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves,
@@ -282,6 +282,7 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
 
   unordered_map<string, int32_t> tserver_replicas_count;
   unordered_map<string, TableReplicasAtServer> table_replicas_info;
+  unordered_set<string> unhealthy_tablet_servers;
 
   // Build a set of tables with RF=1 (single replica tables).
   unordered_set<string> rf1_tables;
@@ -311,6 +312,7 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
                               "non-HEALTHY status ($2)",
                               s.uuid, s.address,
                               ServerHealthToString(s.health));
+      unhealthy_tablet_servers.emplace(s.uuid);
       continue;
     }
     tserver_replicas_count.emplace(s.uuid, 0);
@@ -318,7 +320,7 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
 
   for (const auto& tablet : raw_info.tablet_summaries) {
     if (!config_.move_rf1_replicas) {
-      if (rf1_tables.find(tablet.table_id) != rf1_tables.end()) {
+      if (ContainsKey(rf1_tables, tablet.table_id)) {
         LOG(INFO) << Substitute("tablet $0 of table '$1' ($2) has single replica, skipping",
                                 tablet.id, tablet.table_name, tablet.table_id);
         continue;
@@ -409,6 +411,10 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
   // Populate ClusterBalanceInfo::servers_by_total_replica_count
   auto& servers_by_count = result_info.balance.servers_by_total_replica_count;
   for (const auto& elem : tserver_replicas_count) {
+    if (ContainsKey(config_.ignored_tservers, elem.first)) {
+      VLOG(1) << Substitute("ignoring tserver $0", elem.first);
+      continue;
+    }
     servers_by_count.emplace(elem.second, elem.first);
   }
 
@@ -423,12 +429,33 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info,
     for (const auto& e : elem.second) {
       const auto& ts_uuid = e.first;
       const auto replica_count = e.second;
+      if (ContainsKey(config_.ignored_tservers, ts_uuid)) {
+        VLOG(1) << Substitute("ignoring replicas of table $0 on tserver $1", table_id, ts_uuid);
+        continue;
+      }
       tbi.servers_by_replica_count.emplace(replica_count, ts_uuid);
       max_count = std::max(replica_count, max_count);
       min_count = std::min(replica_count, min_count);
     }
     table_info_by_skew.emplace(max_count - min_count, std::move(tbi));
   }
+
+  // Populate ClusterInfo::tservers_to_empty
+  if (config_.move_replicas_from_ignored_tservers) {
+    auto& tservers_to_empty = result_info.tservers_to_empty;
+    for (const auto& ignored_tserver : config_.ignored_tservers) {
+      if (ContainsKey(unhealthy_tablet_servers, ignored_tserver)) {
+        continue;
+      }
+      const int* replica_count = FindOrNull(tserver_replicas_count, ignored_tserver);
+      if (!replica_count) {
+        return Status::InvalidArgument(Substitute(
+            "ignored tserver $0 is not reported among known tservers", ignored_tserver));
+      }
+      tservers_to_empty.emplace(ignored_tserver, *replica_count);
+    }
+  }
+
   // TODO(aserbin): add sanity checks on the result.
   *info = std::move(result_info);
 
diff --git a/src/kudu/rebalance/rebalancer.h b/src/kudu/rebalance/rebalancer.h
index ed6fcfc..d184cb2 100644
--- a/src/kudu/rebalance/rebalancer.h
+++ b/src/kudu/rebalance/rebalancer.h
@@ -59,6 +59,7 @@ class Rebalancer {
            size_t max_moves_per_server = 5,
            size_t max_staleness_interval_sec = 300,
            int64_t max_run_time_sec = 0,
+           bool move_replicas_from_ignored_tservers = false,
            bool move_rf1_replicas = false,
            bool output_replica_distribution_details = false,
            bool run_policy_fixer = true,
@@ -66,10 +67,11 @@ class Rebalancer {
            bool run_intra_location_rebalancing = true,
            double load_imbalance_threshold = kLoadImbalanceThreshold);
 
-    // UUIDs of ignored servers. If empty, allow to run the
-    // rebalancing only when all tablet servers in cluster are healthy.
-    // If not empty, allow to run the rebalancing when servers in
-    // ignored_tservers are unhealthy.
+    // UUIDs of ignored servers. If empty, run the rebalancing on
+    // all tablet servers in the cluster only when all tablet servers
+    // in cluster are healthy. If not empty, specified tablet servers
+    // (including their health state and replicas on them) will be
+    // ignored by the rebalancer tool.
     std::unordered_set<std::string> ignored_tservers;
 
     // Kudu masters' RPC endpoints.
@@ -95,6 +97,13 @@ class Rebalancer {
     // Maximum run time, in seconds.
     int64_t max_run_time_sec;
 
+    // Whether to move replicas from ignored tservers to other tservers.
+    // If true, replicas on healthy ignored tservers will be moved to other tservers
+    // before running the rebalancing.
+    // If false, ignored tservers and replicas on them will be ignored by the
+    // rebalancer tool.
+    bool move_replicas_from_ignored_tservers;
+
     // Whether to move replicas of tablets with replication factor of one.
     bool move_rf1_replicas;
 
@@ -198,10 +207,10 @@ class Rebalancer {
   // The source and destination replicas are determined by the elements of the
   // 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from and
   // TableReplica::to correspondingly. If no suitable tablet replicas are found,
-  // 'tablet_ids' will be empty with the result status of Status::OK().
-  static Status FindReplicas(const TableReplicaMove& move,
-                             const ClusterRawInfo& raw_info,
-                             std::vector<std::string>* tablet_ids);
+  // 'tablet_ids' will be empty.
+  static void FindReplicas(const TableReplicaMove& move,
+                           const ClusterRawInfo& raw_info,
+                           std::vector<std::string>* tablet_ids);
 
   // Filter move operations in 'replica_moves': remove all operations that would
   // involve moving replicas of tablets which are in 'scheduled_moves'. The
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 18063ee..c81aad6 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -45,6 +45,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/cluster_verifier.h"
@@ -234,6 +235,153 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
   }
 }
 
+// Make sure the rebalancer doesn't start if specified too many ignored tservers.
+class RebalanceStartSafetyTest :
+    public AdminCliTest,
+    public ::testing::WithParamInterface<Kudu1097> {
+};
+INSTANTIATE_TEST_CASE_P(, RebalanceStartSafetyTest,
+                        ::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
+TEST_P(RebalanceStartSafetyTest, TooManyIgnoredTservers) {
+  const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
+  const vector<string> kMasterFlags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+  const vector<string> kTserverFlags = {
+    Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+  };
+
+  FLAGS_num_tablet_servers = 5;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+  // Assign 3 ignored tservers.
+  vector<string> ignored_tservers;
+  for (int i = 0; i < 3; i++) {
+    auto* ts = cluster_->tablet_server(i);
+    ASSERT_NE(nullptr, ts);
+    ignored_tservers.emplace_back(ts->uuid());
+  }
+
+  // Assign move_replicas_from_ignored_tservers=false.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--ignored_tservers=" + JoinStrings(ignored_tservers, ","),
+      "--move_replicas_from_ignored_tservers=false"
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+        << "stderr: " << err;
+  }
+
+  // Assign move_replicas_from_ignored_tservers=true.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--ignored_tservers=" + JoinStrings(ignored_tservers, ","),
+      "--move_replicas_from_ignored_tservers=true"
+    }, &out, &err);
+    ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
+    ASSERT_STR_CONTAINS(err,
+        "Too many ignored tservers; 2 healthy non-ignored servers exist but 3 are required")
+        << "stderr: " << err;
+  }
+}
+
+class RebalanceIgnoredTserversTest :
+    public AdminCliTest {
+};
+TEST_F(RebalanceIgnoredTserversTest, Basic) {
+  FLAGS_num_tablet_servers = 5;
+  NO_FATALS(BuildAndStart());
+
+  // Assign one ignored tserver and move_replicas_from_ignored_tservers=true
+  // without setting it into maintenance mode.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--ignored_tservers=" + cluster_->tablet_server(0)->uuid(),
+      "--move_replicas_from_ignored_tservers"
+    }, &out, &err);
+    ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
+    ASSERT_STR_MATCHES(err, "You should set maintenance mode for tablet server");
+  }
+
+  // Set the ignored tserver into maintenance mode.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "tserver",
+      "state",
+      "enter_maintenance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      cluster_->tablet_server(0)->uuid()
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+  }
+
+  // Run the rebalance again.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--ignored_tservers=" + cluster_->tablet_server(0)->uuid(),
+      "--move_replicas_from_ignored_tservers"
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+
+    // There would be no replica on the ignored tserver after rebalancing.
+    ASSERT_STR_CONTAINS(out, Substitute("$0 | 0", cluster_->tablet_server(0)->uuid()))
+        << "stderr: " << err;
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+        << "stderr: " << err;
+  }
+
+  // Assign two ignored tservers, one of which is unhealthy,
+  // 'move_replicas_from_ignored_tservers=true' and
+  // 'output_replica_distribution_details' are both enabled.
+  auto* ts = cluster_->tablet_server(1);
+  ASSERT_NE(nullptr, ts);
+  ts->Shutdown();
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--ignored_tservers=" + cluster_->tablet_server(0)->uuid() + "," + ts->uuid(),
+      "--move_replicas_from_ignored_tservers",
+      "--output_replica_distribution_details"
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+
+    // The output info would not contain the unhealthy server.
+    ASSERT_STR_NOT_CONTAINS(out, ts->uuid());
+    // There would be no replica on the healthy ignored terver after rebalaning.
+    ASSERT_STR_CONTAINS(out, Substitute("$0 | 0", cluster_->tablet_server(0)->uuid()))
+        << "stderr: " << err;
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
+        << "stderr: " << err;
+  }
+}
+
 static Status CreateTables(
     cluster::ExternalMiniCluster* cluster,
     client::KuduClient* client,
@@ -592,6 +740,128 @@ const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0"
 
 typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest;
 
+// If an ignored tablet server went down during the process of copying data,
+// the rebalancer would ignore the health state of the ignored tablet server,
+// and replicas on it, run the rebalancing on the other tablet servers and
+// exit normally.
+class IgnoredTserverGoesDownDuringRebalancingTest : public RebalancingTest {
+ public:
+  IgnoredTserverGoesDownDuringRebalancingTest()
+      : RebalancingTest(/*num_tables=*/ 5) {
+  }
+  bool is_343_scheme() const override {
+    return true;
+  }
+};
+TEST_F(IgnoredTserverGoesDownDuringRebalancingTest, TserverDown) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const vector<string> kTserverExtraFlags = {
+    // Slow down tablet copy to make rebalancing step running longer
+    // and become observable via tablet data states output by ksck.
+    "--tablet_copy_download_file_inject_latency_ms=1500",
+
+    "--follower_unavailable_considered_failed_sec=30",
+  };
+  NO_FATALS(Prepare(kTserverExtraFlags));
+
+  // Pre-condition: 'kudu cluster ksck' should be happy with the cluster state
+  // shortly after initial setup.
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_TOOL_OK(
+      "cluster",
+      "ksck",
+      cluster_->master()->bound_rpc_addr().ToString()
+    );
+  });
+
+  const uint32_t ignored_tserver_idx = 0;
+
+  // Set the ignored tserver into maintenance mode.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "tserver",
+      "state",
+      "enter_maintenance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      cluster_->tablet_server(ignored_tserver_idx)->uuid()
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+  }
+
+  atomic<bool> run(true);
+  // The thread that shuts down the ignored tserver.
+  thread stopper([&]() {
+    while (run && !IsRebalancingInProgress()) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+    // All right, it's time to stop the ignored tserver.
+    cluster_->tablet_server(ignored_tserver_idx)->Shutdown();
+  });
+  auto stopper_cleanup = MakeScopedCleanup([&]() {
+    run = false;
+    stopper.join();
+  });
+
+  {
+    string out;
+    string err;
+    const auto s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--ignored_tservers=" + cluster_->tablet_server(ignored_tserver_idx)->uuid(),
+      "--move_replicas_from_ignored_tservers",
+      // Limiting the number of replicas to move. This is to make the rebalancer
+      // run longer, making sure the rebalancing is in progress when the tablet
+      // server goes down.
+      "--max_moves_per_server=1",
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+
+    // The rebalancer tool should not crash.
+    ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr);
+    // The rebalancer tool would log some information of the unhealthy server.
+    ASSERT_STR_CONTAINS(err, Substitute("ignoring unhealthy tablet server $0",
+                                       cluster_->tablet_server(ignored_tserver_idx)->uuid()));
+
+    // Restart the ignored tablet server
+    ASSERT_OK(cluster_->tablet_server(ignored_tserver_idx)->Restart());
+    NO_FATALS(cluster_->AssertNoCrashes());
+
+    // Report replica distribution.
+    {
+      string out;
+      string err;
+      const auto s = RunKuduTool({
+        "cluster",
+        "rebalance",
+        cluster_->master()->bound_rpc_addr().ToString(),
+        "--output_replica_distribution_details",
+        "--report_only",
+      }, &out, &err);
+      ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+      // There would be some replicas on the ignored tablet server,
+      // they hasn't been moved to other tablet servers by the rebalancer tool.
+      ASSERT_STR_NOT_CONTAINS(out,
+          Substitute("$0 | $1 | 0",
+                     cluster_->tablet_server(ignored_tserver_idx)->uuid(),
+                     cluster_->tablet_server(ignored_tserver_idx)->bound_rpc_addr().ToString()));
+    }
+  }
+
+  run = false;
+  stopper.join();
+  stopper_cleanup.cancel();
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
 // Make sure the rebalancer is able to do its job if running concurrently
 // with DDL activity on the cluster.
 class DDLDuringRebalancingTest : public RebalancingTest,
@@ -882,8 +1152,8 @@ TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) {
 class TserverGoesDownDuringRebalancingTest : public RebalancingTest,
                                              public Kudu1097ParamTest {
  public:
-  TserverGoesDownDuringRebalancingTest() :
-      RebalancingTest(/*num_tables=*/ 5) {
+  TserverGoesDownDuringRebalancingTest()
+      : RebalancingTest(/*num_tables=*/ 5) {
   }
 
   bool is_343_scheme() const override {
diff --git a/src/kudu/tools/rebalancer_tool.cc b/src/kudu/tools/rebalancer_tool.cc
index d05e790..3a8192c 100644
--- a/src/kudu/tools/rebalancer_tool.cc
+++ b/src/kudu/tools/rebalancer_tool.cc
@@ -37,10 +37,14 @@
 #include <glog/logging.h>
 
 #include "kudu/client/client.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
 #include "kudu/rebalance/cluster_status.h"
 #include "kudu/rebalance/placement_policy_util.h"
 #include "kudu/rebalance/rebalance_algo.h"
@@ -58,6 +62,9 @@ using kudu::cluster_summary::ServerHealth;
 using kudu::cluster_summary::ServerHealthSummary;
 using kudu::cluster_summary::TableSummary;
 using kudu::cluster_summary::TabletSummary;
+using kudu::master::ListTabletServersRequestPB;
+using kudu::master::ListTabletServersResponsePB;
+using kudu::master::MasterServiceProxy;
 using kudu::rebalance::ClusterInfo;
 using kudu::rebalance::ClusterRawInfo;
 using kudu::rebalance::PlacementPolicyViolationInfo;
@@ -100,6 +107,9 @@ Status RebalancerTool::PrintStats(ostream& out) {
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
 
+  // Print information about replica count of healthy ignored tservers.
+  RETURN_NOT_OK(PrintIgnoredTserversStats(ci, out));
+
   const auto& ts_id_by_location = ci.locality.servers_by_location;
   if (ts_id_by_location.empty()) {
     // Nothing to report about: there are no tablet servers reported.
@@ -174,6 +184,16 @@ Status RebalancerTool::Run(RunStatus* result_status, size_t* moves_count) {
   }
 
   size_t moves_count_total = 0;
+  if (config_.move_replicas_from_ignored_tservers) {
+    // Move replicas from healthy ignored tservers to other healthy tservers.
+    RETURN_NOT_OK(CheckIgnoredServers(raw_info, ci));
+    LOG(INFO) << "replacing replicas on healthy ignored tservers";
+    IgnoredTserversRunner runner(
+        this, config_.ignored_tservers, config_.max_moves_per_server, deadline);
+    RETURN_NOT_OK(runner.Init(config_.master_addresses));
+    RETURN_NOT_OK(RunWith(&runner, result_status));
+    moves_count_total += runner.moves_count();
+  }
   if (ts_id_by_location.size() == 1) {
     const auto& location = ts_id_by_location.cbegin()->first;
     LOG(INFO) << "running whole-cluster rebalancing";
@@ -305,6 +325,23 @@ Status RebalancerTool::KsckResultsToClusterRawInfo(
   return Status::OK();
 }
 
+Status RebalancerTool::PrintIgnoredTserversStats(const ClusterInfo& ci,
+                                                 ostream& out) const {
+  const auto& tservers_to_empty = ci.tservers_to_empty;
+  if (tservers_to_empty.empty()) {
+    return Status::OK();
+  }
+  out << "Per-server replica distribution summary for tservers_to_empty:" << endl;
+  DataTable summary({"Server UUID", "Replica Count"});
+  for (const auto& elem: tservers_to_empty) {
+    summary.AddRow({ elem.first, to_string(elem.second) });
+  }
+  RETURN_NOT_OK(summary.PrintTo(out));
+  out << endl;
+
+  return Status::OK();
+}
+
 Status RebalancerTool::PrintCrossLocationBalanceStats(const ClusterInfo& ci,
                                                       ostream& out) const {
   // Print location load information.
@@ -594,6 +631,22 @@ Status RebalancerTool::RefreshKsckResults() {
   return Status::OK();
 }
 
+Status RebalancerTool::CheckIgnoredServers(const rebalance::ClusterRawInfo& raw_info,
+                                           const rebalance::ClusterInfo& cluster_info) {
+  int remaining_tservers_count = cluster_info.balance.servers_by_total_replica_count.size();
+  int max_replication_factor = 0;
+  for (const auto& s : raw_info.table_summaries) {
+    max_replication_factor = std::max(max_replication_factor, s.replication_factor);
+  }
+  if (remaining_tservers_count < max_replication_factor) {
+    return Status::InvalidArgument(
+        Substitute("Too many ignored tservers; "
+                   "$0 healthy non-ignored servers exist but $1 are required.",
+                   remaining_tservers_count, max_replication_factor));
+  }
+  return Status::OK();
+}
+
 RebalancerTool::BaseRunner::BaseRunner(RebalancerTool* rebalancer,
                                        std::unordered_set<std::string> ignored_tservers,
                                        size_t max_moves_per_server,
@@ -870,6 +923,8 @@ Status RebalancerTool::AlgoBasedRunner::GetNextMovesImpl(
   for (const auto& s : raw_info.tserver_summaries) {
     if (s.health != ServerHealth::HEALTHY) {
       if (ContainsKey(ignored_tservers_, s.uuid)) {
+        LOG(INFO) << Substitute("ignoring unhealthy tablet server $0 ($1).",
+                                s.uuid, s.address);
         continue;
       }
       return Status::IllegalState(
@@ -932,7 +987,7 @@ Status RebalancerTool::AlgoBasedRunner::GetNextMovesImpl(
             });
   for (const auto& move : moves) {
     vector<string> tablet_ids;
-    RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids));
+    FindReplicas(move, raw_info, &tablet_ids);
     if (!loc) {
       // In case of cross-location (a.k.a. inter-location) rebalancing it is
       // necessary to make sure the majority of replicas would not end up
@@ -946,7 +1001,7 @@ Status RebalancerTool::AlgoBasedRunner::GetNextMovesImpl(
     std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_);
     string move_tablet_id;
     for (const auto& tablet_id : tablet_ids) {
-      if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) {
+      if (!ContainsKey(tablets_in_move, tablet_id)) {
         // For now, choose the very first tablet that does not have replicas
         // in move. Later on, additional logic might be added to find
         // the best candidate.
@@ -1083,6 +1138,7 @@ RebalancerTool::IntraLocationRunner::IntraLocationRunner(
                       std::move(deadline)),
       location_(std::move(location)) {
 }
+
 RebalancerTool::CrossLocationRunner::CrossLocationRunner(
     RebalancerTool* rebalancer,
     std::unordered_set<std::string> ignored_tservers,
@@ -1096,7 +1152,7 @@ RebalancerTool::CrossLocationRunner::CrossLocationRunner(
       algorithm_(load_imbalance_threshold) {
 }
 
-RebalancerTool::PolicyFixer::PolicyFixer(
+RebalancerTool::ReplaceBasedRunner::ReplaceBasedRunner(
     RebalancerTool* rebalancer,
     std::unordered_set<std::string> ignored_tservers,
     size_t max_moves_per_server,
@@ -1107,12 +1163,12 @@ RebalancerTool::PolicyFixer::PolicyFixer(
                  std::move(deadline)) {
 }
 
-Status RebalancerTool::PolicyFixer::Init(vector<string> master_addresses) {
+Status RebalancerTool::ReplaceBasedRunner::Init(vector<string> master_addresses) {
   DCHECK(moves_to_schedule_.empty());
   return BaseRunner::Init(std::move(master_addresses));
 }
 
-void RebalancerTool::PolicyFixer::LoadMoves(
+void RebalancerTool::ReplaceBasedRunner::LoadMoves(
     vector<Rebalancer::ReplicaMove> replica_moves) {
   // Replace the list of moves operations to schedule. Even if it's not empty,
   // some elements of it might be irrelevant anyway, so there is no need to
@@ -1138,8 +1194,8 @@ void RebalancerTool::PolicyFixer::LoadMoves(
   }
 }
 
-bool RebalancerTool::PolicyFixer::ScheduleNextMove(bool* has_errors,
-                                                   bool* timed_out) {
+bool RebalancerTool::ReplaceBasedRunner::ScheduleNextMove(bool* has_errors,
+                                                          bool* timed_out) {
   DCHECK(has_errors);
   DCHECK(timed_out);
   *has_errors = false;
@@ -1170,7 +1226,7 @@ bool RebalancerTool::PolicyFixer::ScheduleNextMove(bool* has_errors,
   return true;
 }
 
-bool RebalancerTool::PolicyFixer::UpdateMovesInProgressStatus(
+bool RebalancerTool::ReplaceBasedRunner::UpdateMovesInProgressStatus(
     bool* has_errors, bool* timed_out, bool* has_pending_moves) {
   DCHECK(has_errors);
   DCHECK(timed_out);
@@ -1205,7 +1261,6 @@ bool RebalancerTool::PolicyFixer::UpdateMovesInProgressStatus(
       // The replacement has completed (success or failure): update the stats
       // on the pending operations per server.
       ++moves_count_;
-      has_updates = true;
       LOG(INFO) << Substitute("tablet $0: '$1' -> '?' move completed: $2",
                               tablet_id, ts_uuid, completion_status.ToString());
       UpdateOnMoveCompleted(ts_uuid);
@@ -1219,7 +1274,7 @@ bool RebalancerTool::PolicyFixer::UpdateMovesInProgressStatus(
   return has_updates;
 }
 
-Status RebalancerTool::PolicyFixer::GetNextMovesImpl(
+Status RebalancerTool::ReplaceBasedRunner::GetNextMovesImpl(
     vector<Rebalancer::ReplicaMove>* replica_moves) {
   ClusterRawInfo raw_info;
   RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(boost::none, &raw_info));
@@ -1232,6 +1287,8 @@ Status RebalancerTool::PolicyFixer::GetNextMovesImpl(
   for (const auto& s : raw_info.tserver_summaries) {
     if (s.health != ServerHealth::HEALTHY) {
       if (ContainsKey(ignored_tservers_, s.uuid)) {
+        LOG(INFO) << Substitute("ignoring unhealthy tablet server $0 ($1).",
+                                s.uuid, s.address);
         continue;
       }
       return Status::IllegalState(
@@ -1241,45 +1298,10 @@ Status RebalancerTool::PolicyFixer::GetNextMovesImpl(
   }
   ClusterInfo ci;
   RETURN_NOT_OK(rebalancer_->BuildClusterInfo(raw_info, scheduled_moves_, &ci));
-
-  TabletsPlacementInfo placement_info;
-  RETURN_NOT_OK(
-      BuildTabletsPlacementInfo(raw_info, scheduled_moves_, &placement_info));
-
-  vector<PlacementPolicyViolationInfo> ppvi;
-  RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
-
-  // Filter out all reported violations which are already taken care of.
-  // The idea is to have not more than one pending operation per tablet.
-  {
-    decltype(ppvi) ppvi_filtered;
-    for (auto& info : ppvi) {
-      if (ContainsKey(scheduled_moves_, info.tablet_id)) {
-        continue;
-      }
-      ppvi_filtered.emplace_back(std::move(info));
-    }
-    ppvi = std::move(ppvi_filtered);
-  }
-
-  RETURN_NOT_OK(FindMovesToReimposePlacementPolicy(
-      placement_info, ci.locality, ppvi, replica_moves));
-
-  if (PREDICT_FALSE(VLOG_IS_ON(1))) {
-    for (const auto& info : ppvi) {
-      VLOG(1) << Substitute("policy violation at location '$0': tablet $1",
-                            info.majority_location, info.tablet_id);
-    }
-    for (const auto& move : *replica_moves) {
-      VLOG(1) << Substitute("policy fix for tablet $0: replica to remove $1",
-                            move.tablet_uuid, move.ts_uuid_from);
-    }
-  }
-
-  return Status::OK();
+  return GetReplaceMoves(ci, raw_info, replica_moves);
 }
 
-bool RebalancerTool::PolicyFixer::FindNextMove(Rebalancer::ReplicaMove* move) {
+bool RebalancerTool::ReplaceBasedRunner::FindNextMove(Rebalancer::ReplicaMove* move) {
   DCHECK(move);
   // use pessimistic /2 limit for max_moves_per_server_ since the
   // desitnation servers for the move of the replica marked with
@@ -1296,7 +1318,7 @@ bool RebalancerTool::PolicyFixer::FindNextMove(Rebalancer::ReplicaMove* move) {
   return false;
 }
 
-void RebalancerTool::PolicyFixer::UpdateOnMoveScheduled(Rebalancer::ReplicaMove move) {
+void RebalancerTool::ReplaceBasedRunner::UpdateOnMoveScheduled(Rebalancer::ReplicaMove move) {
   const auto tablet_uuid = move.tablet_uuid;
   const auto ts_uuid = move.ts_uuid_from;
 
@@ -1331,5 +1353,181 @@ void RebalancerTool::PolicyFixer::UpdateOnMoveScheduled(Rebalancer::ReplicaMove
   DCHECK(ts_op_count_updated);
 }
 
+RebalancerTool::PolicyFixer::PolicyFixer(
+    RebalancerTool* rebalancer,
+    std::unordered_set<std::string> ignored_tservers,
+    size_t max_moves_per_server,
+    boost::optional<MonoTime> deadline)
+    : ReplaceBasedRunner(rebalancer,
+                         std::move(ignored_tservers),
+                         max_moves_per_server,
+                         std::move(deadline)) {
+}
+
+Status RebalancerTool::PolicyFixer::GetReplaceMoves(
+    const rebalance::ClusterInfo& ci,
+    const rebalance::ClusterRawInfo& raw_info,
+    vector<Rebalancer::ReplicaMove>* replica_moves) {
+  TabletsPlacementInfo placement_info;
+  RETURN_NOT_OK(
+      BuildTabletsPlacementInfo(raw_info, scheduled_moves_, &placement_info));
+
+  vector<PlacementPolicyViolationInfo> ppvi;
+  RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi));
+
+  // Filter out all reported violations which are already taken care of.
+  // The idea is to have not more than one pending operation per tablet.
+  {
+    decltype(ppvi) ppvi_filtered;
+    for (auto& info : ppvi) {
+      if (ContainsKey(scheduled_moves_, info.tablet_id)) {
+        continue;
+      }
+      ppvi_filtered.emplace_back(std::move(info));
+    }
+    ppvi = std::move(ppvi_filtered);
+  }
+
+  RETURN_NOT_OK(FindMovesToReimposePlacementPolicy(
+      placement_info, ci.locality, ppvi, replica_moves));
+
+  if (PREDICT_FALSE(VLOG_IS_ON(1))) {
+    for (const auto& info : ppvi) {
+      VLOG(1) << Substitute("policy violation at location '$0': tablet $1",
+                            info.majority_location, info.tablet_id);
+    }
+    for (const auto& move : *replica_moves) {
+      VLOG(1) << Substitute("policy fix for tablet $0: replica to remove $1",
+                            move.tablet_uuid, move.ts_uuid_from);
+    }
+  }
+
+  return Status::OK();
+}
+
+RebalancerTool::IgnoredTserversRunner::IgnoredTserversRunner(
+    RebalancerTool* rebalancer,
+    std::unordered_set<std::string> ignored_tservers,
+    size_t max_moves_per_server,
+    boost::optional<MonoTime> deadline)
+    : ReplaceBasedRunner(rebalancer,
+                         std::move(ignored_tservers),
+                         max_moves_per_server,
+                         std::move(deadline)),
+      random_generator_(random_device_()) {
+}
+
+Status RebalancerTool::IgnoredTserversRunner::GetReplaceMoves(
+    const rebalance::ClusterInfo& ci,
+    const rebalance::ClusterRawInfo& raw_info,
+    vector<Rebalancer::ReplicaMove>* replica_moves) {
+
+  // In order not to place any replica on the ignored tservers,
+  // allow to run only when all healthy ignored tservers are in
+  // maintenance (or decommision) mode.
+  RETURN_NOT_OK(CheckIgnoredTserversState(ci));
+
+  // Build IgnoredTserversInfo.
+  IgnoredTserversInfo ignored_tservers_info;
+  for (const auto& tablet_summary : raw_info.tablet_summaries) {
+    if (ContainsKey(scheduled_moves_, tablet_summary.id)) {
+      continue;
+    }
+    if (tablet_summary.result != cluster_summary::HealthCheckResult::HEALTHY &&
+        tablet_summary.result != cluster_summary::HealthCheckResult::RECOVERING) {
+      VLOG(1) << Substitute("tablet $0: not considering replicas for movement "
+                            "since the tablet's status is '$1'",
+                            tablet_summary.id,
+                            cluster_summary::HealthCheckResultToString(tablet_summary.result));
+      continue;
+    }
+    TabletInfo tablet_info;
+    for (const auto& replica_info : tablet_summary.replicas) {
+      if (replica_info.is_leader && replica_info.consensus_state) {
+        const auto& cstate = *replica_info.consensus_state;
+        if (cstate.opid_index) {
+          tablet_info.tablet_id = tablet_summary.id;
+          tablet_info.config_idx = *cstate.opid_index;
+          break;
+        }
+      }
+    }
+    for (const auto& replica_info : tablet_summary.replicas) {
+      if (!ContainsKey(ci.tservers_to_empty, replica_info.ts_uuid)) {
+        continue;
+      }
+      auto& tablets = LookupOrEmplace(
+          &ignored_tservers_info, replica_info.ts_uuid, vector<TabletInfo>());
+      tablets.emplace_back(tablet_info);
+    }
+  }
+  GetMovesFromIgnoredTservers(ignored_tservers_info, replica_moves);
+  return Status::OK();
+}
+
+Status RebalancerTool::IgnoredTserversRunner::CheckIgnoredTserversState(
+    const rebalance::ClusterInfo& ci) {
+  if (ci.tservers_to_empty.empty()) {
+    return Status::OK();
+  }
+
+  LeaderMasterProxy proxy(client_);
+  ListTabletServersRequestPB req;
+  ListTabletServersResponsePB resp;
+  req.set_include_states(true);
+  RETURN_NOT_OK((proxy.SyncRpc<ListTabletServersRequestPB, ListTabletServersResponsePB>(
+      req, &resp, "ListTabletServers", &MasterServiceProxy::ListTabletServersAsync)));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+
+  const auto& servers = resp.servers();
+  for (const auto& server : servers) {
+    const auto& ts_uuid = server.instance_id().permanent_uuid();
+    if (!ContainsKey(ci.tservers_to_empty, ts_uuid)) {
+      continue;
+    }
+    if (server.state() != master::TServerStatePB::MAINTENANCE_MODE) {
+      return Status::IllegalState(Substitute(
+        "You should set maintenance mode for tablet server $0 first", ts_uuid));
+    }
+  }
+  return Status::OK();
+}
+
+void RebalancerTool::IgnoredTserversRunner::GetMovesFromIgnoredTservers(
+    const IgnoredTserversInfo& ignored_tservers_info,
+    vector<Rebalancer::ReplicaMove>* replica_moves) {
+  DCHECK(replica_moves);
+  if (ignored_tservers_info.empty()) {
+    return;
+  }
+
+  unordered_set<string> tablets_in_move;
+  transform(scheduled_moves_.begin(), scheduled_moves_.end(),
+            inserter(tablets_in_move, tablets_in_move.begin()),
+            [](const Rebalancer::MovesInProgress::value_type& elem) {
+              return elem.first;
+            });
+
+  vector<Rebalancer::ReplicaMove> result_moves;
+  for (const auto& elem : ignored_tservers_info) {
+    auto tablets_info = elem.second;
+    // Some tablets are randomly picked to move from ignored tservers in a batch.
+    // This method will output sufficient tablet replica movement operations
+    // to avoid repeated calculations.
+    shuffle(tablets_info.begin(), tablets_info.end(), random_generator_);
+    for (int i = 0; i < tablets_info.size() && i < max_moves_per_server_ * 5; ++i) {
+      if (ContainsKey(tablets_in_move, tablets_info[i].tablet_id)) {
+        continue;
+      }
+      tablets_in_move.emplace(tablets_info[i].tablet_id);
+      ReplicaMove move = {tablets_info[i].tablet_id, elem.first, "", tablets_info[i].config_idx};
+      result_moves.emplace_back(std::move(move));
+    }
+  }
+  *replica_moves = std::move(result_moves);
+}
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/rebalancer_tool.h b/src/kudu/tools/rebalancer_tool.h
index 371031f..112b07c 100644
--- a/src/kudu/tools/rebalancer_tool.h
+++ b/src/kudu/tools/rebalancer_tool.h
@@ -137,7 +137,7 @@ class RebalancerTool : public rebalance::Rebalancer {
   // Runner that leverages RebalancingAlgo interface for rebalancing.
   class AlgoBasedRunner : public BaseRunner {
    public:
-    // The 'ignored_tservers' specifies dead tablet servers that could be
+    // The 'ignored_tservers' specifies tablet servers that could be
     // ignored by rebalancer.
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
@@ -207,7 +207,7 @@ class RebalancerTool : public rebalance::Rebalancer {
 
   class IntraLocationRunner : public AlgoBasedRunner {
    public:
-    // The 'ignored_tservers' specifies dead tablet servers that could be
+    // The 'ignored_tservers' specifies tablet servers that could be
     // ignored by rebalancer.
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
@@ -238,7 +238,7 @@ class RebalancerTool : public rebalance::Rebalancer {
 
   class CrossLocationRunner : public AlgoBasedRunner {
    public:
-    // The 'ignored_tservers' specifies dead tablet servers that could be
+    // The 'ignored_tservers' specifies tablet servers that could be
     // ignored by rebalancer.
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
@@ -267,12 +267,21 @@ class RebalancerTool : public rebalance::Rebalancer {
     rebalance::LocationBalancingAlgo algorithm_;
   };
 
-  class PolicyFixer : public BaseRunner {
+  // Runner that leverages 'SetReplace' method to move replicas.
+  class ReplaceBasedRunner : public BaseRunner {
    public:
-    PolicyFixer(RebalancerTool* rebalancer,
-                std::unordered_set<std::string> ignored_tservers,
-                size_t max_moves_per_server,
-                boost::optional<MonoTime> deadline);
+    // The 'ignored_tservers' specifies tablet servers that could be
+    // ignored by rebalancer.
+    // The 'max_moves_per_server' specifies the maximum number of operations
+    // per tablet server (both the source and the destination are counted in).
+    // The 'load_imbalance_threshold' specified the threshold for the
+    // balancing algorithm used for finding the most optimal replica movements.
+    // The 'deadline' specifies the deadline for the run, 'boost::none'
+    // if no timeout is set.
+    ReplaceBasedRunner(RebalancerTool* rebalancer,
+                       std::unordered_set<std::string> ignored_tservers,
+                       size_t max_moves_per_server,
+                       boost::optional<MonoTime> deadline);
 
     Status Init(std::vector<std::string> master_addresses) override;
 
@@ -284,12 +293,16 @@ class RebalancerTool : public rebalance::Rebalancer {
                                      bool* timed_out,
                                      bool* has_pending_moves) override;
 
-   private:
+   protected:
     // Key is tserver UUID which corresponds to value.ts_uuid_from.
     typedef std::unordered_multimap<std::string, Rebalancer::ReplicaMove> MovesToSchedule;
 
     Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
 
+    virtual Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
+                                   const rebalance::ClusterRawInfo& raw_info,
+                                   std::vector<Rebalancer::ReplicaMove>* replica_moves) = 0;
+
     bool FindNextMove(Rebalancer::ReplicaMove* move);
 
     // Update the helper containers once a move operation has been scheduled.
@@ -299,6 +312,60 @@ class RebalancerTool : public rebalance::Rebalancer {
     MovesToSchedule moves_to_schedule_;
   };
 
+  class PolicyFixer : public ReplaceBasedRunner {
+   public:
+    PolicyFixer(RebalancerTool* rebalancer,
+                std::unordered_set<std::string> ignored_tservers,
+                size_t max_moves_per_server,
+                boost::optional<MonoTime> deadline);
+   private:
+   // Get replica moves to restore the placement policy restrictions.
+   // If returns Status::OK() with replica_moves empty, the distribution
+   // of tablet relicas is considered conform the main constraint of the
+   // placement policy.
+    Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
+                           const rebalance::ClusterRawInfo& raw_info,
+                           std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
+  };
+
+  class IgnoredTserversRunner : public ReplaceBasedRunner {
+   public:
+    IgnoredTserversRunner(RebalancerTool* rebalancer,
+                          std::unordered_set<std::string> ignored_tservers,
+                          size_t max_moves_per_server,
+                          boost::optional<MonoTime> deadline);
+
+   private:
+    // Key is tserver UUID which corresponds to value.ts_uuid_from.
+    typedef std::unordered_multimap<std::string, Rebalancer::ReplicaMove> MovesToSchedule;
+
+    struct TabletInfo {
+      std::string tablet_id;
+      boost::optional<int64_t> config_idx;  // For CAS-like change of Raft configs.
+    };
+
+    // Mapping tserver UUID to tablets on it.
+    typedef std::unordered_map<std::string, std::vector<TabletInfo>> IgnoredTserversInfo;
+
+    // Get replica moves to move replicas from healthy ignored tservers.
+    // If returns Status::OK() with replica_moves empty, there would be
+    // no replica on the healthy ignored tservers.
+    Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
+                           const rebalance::ClusterRawInfo& raw_info,
+                           std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
+
+    // Check the state of ignored tservers.
+    // Return Status::OK() only when all the ignored tservers are in maintenance mode.
+    Status CheckIgnoredTserversState(const rebalance::ClusterInfo& ci);
+
+    void GetMovesFromIgnoredTservers(const IgnoredTserversInfo& ignored_tservers_info,
+                                     std::vector<Rebalancer::ReplicaMove>* replica_moves);
+
+    // Random device and generator for selecting among multiple choices, when appropriate.
+    std::random_device random_device_;
+    std::mt19937 random_generator_;
+  };
+
   // Convert ksck results into information relevant to rebalancing the cluster
   // at the location specified by 'location' parameter ('boost::none' for
   // 'location' means that's about cross-location rebalancing). Basically,
@@ -309,6 +376,10 @@ class RebalancerTool : public rebalance::Rebalancer {
       const KsckResults& ksck_info,
       rebalance::ClusterRawInfo* raw_info);
 
+  // Print replica count infomation on ClusterInfo::tservers_to_empty.
+  Status PrintIgnoredTserversStats(const rebalance::ClusterInfo& ci,
+                                   std::ostream& out) const;
+
   // Print information on the cross-location balance.
   Status PrintCrossLocationBalanceStats(const rebalance::ClusterInfo& ci,
                                         std::ostream& out) const;
@@ -324,6 +395,10 @@ class RebalancerTool : public rebalance::Rebalancer {
   Status PrintPolicyViolationInfo(const rebalance::ClusterRawInfo& raw_info,
                                   std::ostream& out) const;
 
+  // Check whether it is safe to move all replicas from the ignored to other servers.
+  Status CheckIgnoredServers(const rebalance::ClusterRawInfo& raw_info,
+                             const rebalance::ClusterInfo& cluster_info);
+
   // Run rebalancing using the specified runner.
   Status RunWith(Runner* runner, RunStatus* result_status);
 
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 5875dfb..8bb3d5c 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -61,10 +61,11 @@ DECLARE_string(tablets);
 
 DEFINE_string(ignored_tservers, "",
               "UUIDs of tablet servers to ignore while rebalancing the cluster "
-              "(comma-separated list). If specified, allow to run the rebalancing "
-              "when some tablet servers in 'ignored_tservers' are unhealthy. "
-              "If not specified, allow to run the rebalancing only when all tablet "
-              "servers are healthy.");
+              "(comma-separated list). If specified, the tablet servers are "
+              "effectively ignored by the rebalancer tool, they are not considered "
+              "as a part of the cluster as well as the replicas on them. "
+              "If not specified, the rebalancer tool will run on all the tablet servers "
+              "in the cluster.");
 
 DEFINE_string(sections, "*",
               "Sections to print (comma-separated list of sections, "
@@ -125,6 +126,14 @@ DEFINE_bool(disable_intra_location_rebalancing, false,
             "replica distribution within each location. "
             "This setting is applicable to multi-location clusters only.");
 
+DEFINE_bool(move_replicas_from_ignored_tservers, false,
+            "Whether to move replicas from the specified 'ignored_tservers' to other "
+            "servers when the source tablet server is healthy. "
+            "This setting is effective only if the '--ignored_tservers' flag "
+            "is specified as well. "
+            "If set true, then all ignored tablet servers must be placed into "
+            "the 'maintenance mode'.");
+
 DEFINE_double(load_imbalance_threshold,
               kudu::rebalance::Rebalancer::Config::kLoadImbalanceThreshold,
               "The threshold for the per-table location load imbalance. "
@@ -303,6 +312,7 @@ Status RunRebalance(const RunnerContext& context) {
       FLAGS_max_moves_per_server,
       FLAGS_max_staleness_interval_sec,
       FLAGS_max_run_time_sec,
+      FLAGS_move_replicas_from_ignored_tservers,
       move_single_replicas,
       FLAGS_output_replica_distribution_details,
       !FLAGS_disable_policy_fixer,
@@ -405,6 +415,7 @@ unique_ptr<Mode> BuildClusterMode() {
         .AddOptionalParameter("max_moves_per_server")
         .AddOptionalParameter("max_run_time_sec")
         .AddOptionalParameter("max_staleness_interval_sec")
+        .AddOptionalParameter("move_replicas_from_ignored_tservers")
         .AddOptionalParameter("move_single_replicas")
         .AddOptionalParameter("output_replica_distribution_details")
         .AddOptionalParameter("report_only")