You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/11 18:03:02 UTC

[kudu] 02/03: [rebalance] Add '--ignored_tservers' flag to rebalancer

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0212da22aa3c170439ab59d6d2fc52ac7aa42caf
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Mon May 13 19:10:14 2019 +0800

    [rebalance] Add '--ignored_tservers' flag to rebalancer
    
    '--ignored_tservers' is useful for users to run the rebalancing
    even if there are some known dead tservers in the cluster.
    These tservers are down but have not been removed from the cluster
    (the 'ksck' also report them unless restarting masters).
    If specified 'ignored_tservers', rebalancer will skip these tservers
    and run on other healthy tservers.
    
    Change-Id: Ie83a7d2497b778833f3f96fa8aa20e7486c5ebbb
    Reviewed-on: http://gerrit.cloudera.org:8080/13539
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/tools/rebalance-test.cc       |  2 ++
 src/kudu/tools/rebalancer.cc           | 62 +++++++++++++++++++++++++---------
 src/kudu/tools/rebalancer.h            | 25 +++++++++++++-
 src/kudu/tools/rebalancer_tool-test.cc | 46 ++++++++++++++++++-------
 src/kudu/tools/tool_action_cluster.cc  | 11 ++++++
 5 files changed, 116 insertions(+), 30 deletions(-)

diff --git a/src/kudu/tools/rebalance-test.cc b/src/kudu/tools/rebalance-test.cc
index badac31..807dc82 100644
--- a/src/kudu/tools/rebalance-test.cc
+++ b/src/kudu/tools/rebalance-test.cc
@@ -231,6 +231,7 @@ class KsckResultsToClusterBalanceInfoTest : public ::testing::Test {
 // of RF=1 replicas is allowed.
 TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
   const Rebalancer::Config rebalancer_config = {
+    {},     // ignored_tservers
     {},     // master_addresses
     {},     // table_filters
     5,      // max_moves_per_server
@@ -367,6 +368,7 @@ TEST_F(KsckResultsToClusterBalanceInfoTest, MoveRf1Replicas) {
 // RF=1 replicas is disabled.
 TEST_F(KsckResultsToClusterBalanceInfoTest, DoNotMoveRf1Replicas) {
   const Rebalancer::Config rebalancer_config = {
+    {},     // ignored_tservers
     {},     // master_addresses
     {},     // table_filters
     5,      // max_moves_per_server
diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc
index 4291806..55422bc 100644
--- a/src/kudu/tools/rebalancer.cc
+++ b/src/kudu/tools/rebalancer.cc
@@ -80,6 +80,7 @@ namespace kudu {
 namespace tools {
 
 Rebalancer::Config::Config(
+    std::vector<std::string> ignored_tservers_param,
     std::vector<std::string> master_addresses,
     std::vector<std::string> table_filters,
     size_t max_moves_per_server,
@@ -91,7 +92,8 @@ Rebalancer::Config::Config(
     bool run_cross_location_rebalancing,
     bool run_intra_location_rebalancing,
     double load_imbalance_threshold)
-    : master_addresses(std::move(master_addresses)),
+    : ignored_tservers(ignored_tservers_param.begin(), ignored_tservers_param.end()),
+      master_addresses(std::move(master_addresses)),
       table_filters(std::move(table_filters)),
       max_moves_per_server(max_moves_per_server),
       max_staleness_interval_sec(max_staleness_interval_sec),
@@ -111,11 +113,8 @@ Rebalancer::Rebalancer(const Config& config)
 
 Status Rebalancer::PrintStats(ostream& out) {
   // First, report on the current balance state of the cluster.
-  RETURN_NOT_OK(RefreshKsckResults());
-  const KsckResults& results = ksck_->results();
-
   ClusterRawInfo raw_info;
-  RETURN_NOT_OK(KsckResultsToClusterRawInfo(boost::none, results, &raw_info));
+  RETURN_NOT_OK(GetClusterRawInfo(boost::none, &raw_info));
 
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
@@ -154,7 +153,7 @@ Status Rebalancer::PrintStats(ostream& out) {
 
   for (const auto& location : locations) {
     ClusterRawInfo raw_info;
-    RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, results, &raw_info));
+    RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, ksck_->results(), &raw_info));
     ClusterInfo ci;
     RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
     RETURN_NOT_OK(PrintLocationBalanceStats(location, raw_info, ci, out));
@@ -198,7 +197,7 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
     const auto& location = ts_id_by_location.cbegin()->first;
     LOG(INFO) << "running whole-cluster rebalancing";
     IntraLocationRunner runner(
-        this, config_.max_moves_per_server, deadline, location);
+        this, config_.ignored_tservers, config_.max_moves_per_server, deadline, location);
     RETURN_NOT_OK(runner.Init(config_.master_addresses));
     RETURN_NOT_OK(RunWith(&runner, result_status));
     moves_count_total += runner.moves_count();
@@ -219,7 +218,8 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
     if (config_.run_policy_fixer) {
       // Fix placement policy violations, if any.
       LOG(INFO) << "fixing placement policy violations";
-      PolicyFixer runner(this, config_.max_moves_per_server, deadline);
+      PolicyFixer runner(
+          this, config_.ignored_tservers, config_.max_moves_per_server, deadline);
       RETURN_NOT_OK(runner.Init(config_.master_addresses));
       RETURN_NOT_OK(RunWith(&runner, result_status));
       moves_count_total += runner.moves_count();
@@ -228,6 +228,7 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
       // Run the rebalancing across locations (inter-location rebalancing).
       LOG(INFO) << "running cross-location rebalancing";
       CrossLocationRunner runner(this,
+                                 config_.ignored_tservers,
                                  config_.max_moves_per_server,
                                  config_.load_imbalance_threshold,
                                  deadline);
@@ -241,8 +242,11 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) {
         const auto& location = elem.first;
         // TODO(aserbin): it would be nice to run these rebalancers in parallel
         LOG(INFO) << "running rebalancer within location '" << location << "'";
-        IntraLocationRunner runner(
-            this, config_.max_moves_per_server, deadline, location);
+        IntraLocationRunner runner(this,
+                                   config_.ignored_tservers,
+                                   config_.max_moves_per_server,
+                                   deadline,
+                                   location);
         RETURN_NOT_OK(runner.Init(config_.master_addresses));
         RETURN_NOT_OK(RunWith(&runner, result_status));
         moves_count_total += runner.moves_count();
@@ -952,9 +956,11 @@ Status Rebalancer::RefreshKsckResults() {
 }
 
 Rebalancer::BaseRunner::BaseRunner(Rebalancer* rebalancer,
+                                   std::unordered_set<std::string> ignored_tservers,
                                    size_t max_moves_per_server,
                                    boost::optional<MonoTime> deadline)
     : rebalancer_(rebalancer),
+      ignored_tservers_(std::move(ignored_tservers)),
       max_moves_per_server_(max_moves_per_server),
       deadline_(std::move(deadline)),
       moves_count_(0) {
@@ -1017,9 +1023,13 @@ void Rebalancer::BaseRunner::UpdateOnMoveCompleted(const string& ts_uuid) {
 
 Rebalancer::AlgoBasedRunner::AlgoBasedRunner(
     Rebalancer* rebalancer,
+    std::unordered_set<std::string> ignored_tservers,
     size_t max_moves_per_server,
     boost::optional<MonoTime> deadline)
-    : BaseRunner(rebalancer, max_moves_per_server, std::move(deadline)),
+    : BaseRunner(rebalancer,
+                 std::move(ignored_tservers),
+                 max_moves_per_server,
+                 std::move(deadline)),
       random_generator_(random_device_()) {
 }
 
@@ -1217,10 +1227,14 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl(
   RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(loc, &raw_info));
 
   // For simplicity, allow to run the rebalancing only when all tablet servers
-  // are in good shape. Otherwise, the rebalancing might interfere with the
+  // are in good shape (except those specified in 'ignored_tservers_').
+  // Otherwise, the rebalancing might interfere with the
   // automatic re-replication or get unexpected errors while moving replicas.
   for (const auto& s : raw_info.tserver_summaries) {
     if (s.health != KsckServerHealth::HEALTHY) {
+      if (ContainsKey(ignored_tservers_, s.uuid)) {
+        continue;
+      }
       return Status::IllegalState(
           Substitute("tablet server $0 ($1): unacceptable health status $2",
                      s.uuid, s.address, ServerHealthToString(s.health)));
@@ -1424,26 +1438,38 @@ void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl(
 
 Rebalancer::IntraLocationRunner::IntraLocationRunner(
     Rebalancer* rebalancer,
+    std::unordered_set<std::string> ignored_tservers,
     size_t max_moves_per_server,
     boost::optional<MonoTime> deadline,
     std::string location)
-    : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)),
+    : AlgoBasedRunner(rebalancer,
+                      std::move(ignored_tservers),
+                      max_moves_per_server,
+                      std::move(deadline)),
       location_(std::move(location)) {
 }
 
 Rebalancer::CrossLocationRunner::CrossLocationRunner(Rebalancer* rebalancer,
+    std::unordered_set<std::string> ignored_tservers,
     size_t max_moves_per_server,
     double load_imbalance_threshold,
     boost::optional<MonoTime> deadline)
-    : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)),
+    : AlgoBasedRunner(rebalancer,
+                      std::move(ignored_tservers),
+                      max_moves_per_server,
+                      std::move(deadline)),
       algorithm_(load_imbalance_threshold) {
 }
 
 Rebalancer::PolicyFixer::PolicyFixer(
     Rebalancer* rebalancer,
+    std::unordered_set<std::string> ignored_tservers,
     size_t max_moves_per_server,
     boost::optional<MonoTime> deadline)
-    : BaseRunner(rebalancer, max_moves_per_server, std::move(deadline)) {
+    : BaseRunner(rebalancer,
+                 std::move(ignored_tservers),
+                 max_moves_per_server,
+                 std::move(deadline)) {
 }
 
 Status Rebalancer::PolicyFixer::Init(vector<string> master_addresses) {
@@ -1580,11 +1606,15 @@ Status Rebalancer::PolicyFixer::GetNextMovesImpl(
   RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(boost::none, &raw_info));
 
   // For simplicity, allow to run the rebalancing only when all tablet servers
-  // are in good shape. Otherwise, the rebalancing might interfere with the
+  // are in good shape (except those specified in 'ignored_tservers_').
+  // Otherwise, the rebalancing might interfere with the
   // automatic re-replication or get unexpected errors while moving replicas.
   // TODO(aserbin): move it somewhere else?
   for (const auto& s : raw_info.tserver_summaries) {
     if (s.health != KsckServerHealth::HEALTHY) {
+      if (ContainsKey(ignored_tservers_, s.uuid)) {
+        continue;
+      }
       return Status::IllegalState(
           Substitute("tablet server $0 ($1): unacceptable health status $2",
                      s.uuid, s.address, ServerHealthToString(s.health)));
diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h
index cb7fd11..617f33a 100644
--- a/src/kudu/tools/rebalancer.h
+++ b/src/kudu/tools/rebalancer.h
@@ -25,6 +25,7 @@
 #include <set>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -65,7 +66,9 @@ class Rebalancer {
   struct Config {
     static constexpr double kLoadImbalanceThreshold = 1.0;
 
-    Config(std::vector<std::string> master_addresses = {},
+    // NOLINTNEXTLINE(google-explicit-constructor)
+    Config(std::vector<std::string> ignored_tservers_param = {},
+           std::vector<std::string> master_addresses = {},
            std::vector<std::string> table_filters = {},
            size_t max_moves_per_server = 5,
            size_t max_staleness_interval_sec = 300,
@@ -77,6 +80,12 @@ class Rebalancer {
            bool run_intra_location_rebalancing = true,
            double load_imbalance_threshold = kLoadImbalanceThreshold);
 
+    // UUIDs of ignored servers. If empty, allow to run the
+    // rebalancing only when all tablet servers in cluster are healthy.
+    // If not empty, allow to run the rebalancing when servers in
+    // ignored_tservers are unhealthy.
+    std::unordered_set<std::string> ignored_tservers;
+
     // Kudu masters' RPC endpoints.
     std::vector<std::string> master_addresses;
 
@@ -199,6 +208,7 @@ class Rebalancer {
   class BaseRunner : public Runner {
    public:
     BaseRunner(Rebalancer* rebalancer,
+               std::unordered_set<std::string> ignored_tservers,
                size_t max_moves_per_server,
                boost::optional<MonoTime> deadline);
 
@@ -227,6 +237,9 @@ class Rebalancer {
     // A pointer to the Rebalancer object.
     Rebalancer* rebalancer_;
 
+    // A set of ignored tablet server UUIDs.
+    const std::unordered_set<std::string> ignored_tservers_;
+
     // Maximum allowed number of move operations per server. For a move
     // operation, a source replica adds +1 at the source server and the target
     // replica adds +1 at the destination server.
@@ -261,12 +274,15 @@ class Rebalancer {
   // Runner that leverages RebalancingAlgo interface for rebalancing.
   class AlgoBasedRunner : public BaseRunner {
    public:
+    // The 'ignored_tservers' specifies dead tablet servers that could be
+    // ignored by rebalancer.
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
     // The 'deadline' specifies the deadline for the run, 'boost::none'
     // if no timeout is set. If 'location' is boost::none, rebalance across
     // locations.
     AlgoBasedRunner(Rebalancer* rebalancer,
+                    std::unordered_set<std::string> ignored_tservers,
                     size_t max_moves_per_server,
                     boost::optional<MonoTime> deadline);
 
@@ -326,6 +342,8 @@ class Rebalancer {
 
   class IntraLocationRunner : public AlgoBasedRunner {
    public:
+    // The 'ignored_tservers' specifies dead tablet servers that could be
+    // ignored by rebalancer.
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
     // The 'deadline' specifies the deadline for the run, 'boost::none'
@@ -333,6 +351,7 @@ class Rebalancer {
     // is just one location defined in the whole cluster, the whole cluster will
     // be rebalanced.
     IntraLocationRunner(Rebalancer* rebalancer,
+                        std::unordered_set<std::string> ignored_tservers,
                         size_t max_moves_per_server,
                         boost::optional<MonoTime> deadline,
                         std::string location);
@@ -354,6 +373,8 @@ class Rebalancer {
 
   class CrossLocationRunner : public AlgoBasedRunner {
    public:
+    // The 'ignored_tservers' specifies dead tablet servers that could be
+    // ignored by rebalancer.
     // The 'max_moves_per_server' specifies the maximum number of operations
     // per tablet server (both the source and the destination are counted in).
     // The 'load_imbalance_threshold' specified the threshold for the
@@ -361,6 +382,7 @@ class Rebalancer {
     // The 'deadline' specifies the deadline for the run, 'boost::none'
     // if no timeout is set.
     CrossLocationRunner(Rebalancer* rebalancer,
+                        std::unordered_set<std::string> ignored_tservers,
                         size_t max_moves_per_server,
                         double load_imbalance_threshold,
                         boost::optional<MonoTime> deadline);
@@ -383,6 +405,7 @@ class Rebalancer {
   class PolicyFixer : public BaseRunner {
    public:
     PolicyFixer(Rebalancer* rebalancer,
+                std::unordered_set<std::string> ignored_tservers,
                 size_t max_moves_per_server,
                 boost::optional<MonoTime> deadline);
 
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 4e432ed..3641204 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -172,6 +172,8 @@ Per-table replica distribution summary:
 }
 
 // Make sure the rebalancer doesn't start if a tablet server is down.
+// The rebalancer starts only when the dead tablet server is in the
+// list of ignored_tservers.
 class RebalanceStartCriteriaTest :
     public AdminCliTest,
     public ::testing::WithParamInterface<Kudu1097> {
@@ -199,19 +201,37 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
     ts->Shutdown();
   }
 
-  string out;
-  string err;
-  Status s = RunKuduTool({
-    "cluster",
-    "rebalance",
-    cluster_->master()->bound_rpc_addr().ToString()
-  }, &out, &err);
-  ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
-  const auto err_msg_pattern = Substitute(
-      "Illegal state: tablet server .* \\($0\\): "
-      "unacceptable health status UNAVAILABLE",
-      ts_host_port.ToString());
-  ASSERT_STR_MATCHES(err, err_msg_pattern);
+  // Rebalancer doesn't start if a tablet server is down.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString()
+    }, &out, &err);
+    ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
+    const auto err_msg_pattern = Substitute(
+        "Illegal state: tablet server .* \\($0\\): "
+        "unacceptable health status UNAVAILABLE",
+        ts_host_port.ToString());
+    ASSERT_STR_MATCHES(err, err_msg_pattern);
+  }
+
+  // Rebalancer starts when specifying the dead tablet server in 'ignored_tservers'.
+  {
+    string out;
+    string err;
+    Status s = RunKuduTool({
+      "cluster",
+      "rebalance",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--ignored_tservers=" + cluster_->tablet_server(0)->uuid()
+    }, &out, &err);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+    ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced (moved 0 replicas)")
+        << "stderr: " << err;
+  }
 }
 
 static Status CreateTables(
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 7bbe8d0..28698bf 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -64,6 +64,13 @@ using strings::Substitute;
 DECLARE_string(tables);
 DECLARE_string(tablets);
 
+DEFINE_string(ignored_tservers, "",
+              "UUIDs of tablet servers to ignore while rebalancing the cluster "
+              "(comma-separated list). If specified, allow to run the rebalancing "
+              "when some tablet servers in 'ignored_tservers' are unhealthy. "
+              "If not specified, allow to run the rebalancing only when all tablet "
+              "servers are healthy.");
+
 DEFINE_string(sections, "*",
               "Sections to print (comma-separated list of sections, "
               "available sections are: MASTER_SUMMARIES, TSERVER_SUMMARIES, "
@@ -281,6 +288,8 @@ Status EvaluateMoveSingleReplicasFlag(const vector<string>& master_addresses,
 // can be the source and the destination of no more than the specified number of
 // move operations.
 Status RunRebalance(const RunnerContext& context) {
+  const vector<string> ignored_tservers =
+      Split(FLAGS_ignored_tservers, ",", strings::SkipEmpty());
   vector<string> master_addresses;
   RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
   const vector<string> table_filters =
@@ -293,6 +302,7 @@ Status RunRebalance(const RunnerContext& context) {
   RETURN_NOT_OK(EvaluateMoveSingleReplicasFlag(master_addresses,
                                                &move_single_replicas));
   Rebalancer rebalancer(Rebalancer::Config(
+      ignored_tservers,
       master_addresses,
       table_filters,
       FLAGS_max_moves_per_server,
@@ -395,6 +405,7 @@ unique_ptr<Mode> BuildClusterMode() {
         .AddOptionalParameter("disable_cross_location_rebalancing")
         .AddOptionalParameter("disable_intra_location_rebalancing")
         .AddOptionalParameter("fetch_info_concurrency")
+        .AddOptionalParameter("ignored_tservers")
         .AddOptionalParameter("load_imbalance_threshold")
         .AddOptionalParameter("max_moves_per_server")
         .AddOptionalParameter("max_run_time_sec")