You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/03/17 23:03:39 UTC

[kudu] branch master updated (c45eba4 -> 91d3101)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from c45eba4  [tool] Fix rebuild master tool's help infomation
     new df65b3a  [write_throttling-itest] relax threshold to avoid flakiness
     new 91d3101  [tools] run intra-location rebalancing in parallel

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../integration-tests/write_throttling-itest.cc    |   2 +-
 src/kudu/rebalance/rebalancer.cc                   |   7 +-
 src/kudu/rebalance/rebalancer.h                    |   7 +-
 src/kudu/tools/rebalancer_tool-test.cc             |  26 ++++
 src/kudu/tools/rebalancer_tool.cc                  | 151 ++++++++++++++++++---
 src/kudu/tools/rebalancer_tool.h                   |  15 +-
 src/kudu/tools/tool_action_cluster.cc              |  13 +-
 7 files changed, 192 insertions(+), 29 deletions(-)

[kudu] 02/02: [tools] run intra-location rebalancing in parallel

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 91d3101008961338078d0b80e5f2e3d1dc552133
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Mar 9 14:57:45 2022 -0800

    [tools] run intra-location rebalancing in parallel
    
    This patch addresses TODO in RebalancerTool::Run() to run intra-location
    rebalancing in parallel for location-aware Kudu cluster.  Since the
    location assignment provides an invariant for a tablet server to be only
    in one location, a locatation-aware Kudu cluster automatically consists
    of non-intersecting set of tablet servers.  With that, it's possible to
    independently move replicas within different locations while satisfying
    various run-time properties, e.g. set by --max_moves_per_server and
    other flags.
    
    The new binary was tested on a large production cluster where location
    awareness was enabled and it did significantly speed up the rebalancing
    process.  Running the binary in my test cluster showed significant
    reduction of run times as well.  Theoretically, running intra-location
    rebalancing in parallel might shorten the runtime by N times compared
    with running sequentially, where N is the number of locations
    in a Kudu cluster.
    
    Change-Id: Ie4fe3ef3ec2fcac57114c97d5b6cd81d5d9953c4
    Reviewed-on: http://gerrit.cloudera.org:8080/18308
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/rebalance/rebalancer.cc       |   7 +-
 src/kudu/rebalance/rebalancer.h        |   7 +-
 src/kudu/tools/rebalancer_tool-test.cc |  26 ++++++
 src/kudu/tools/rebalancer_tool.cc      | 151 ++++++++++++++++++++++++++++-----
 src/kudu/tools/rebalancer_tool.h       |  15 +++-
 src/kudu/tools/tool_action_cluster.cc  |  13 ++-
 6 files changed, 191 insertions(+), 28 deletions(-)

diff --git a/src/kudu/rebalance/rebalancer.cc b/src/kudu/rebalance/rebalancer.cc
index 2d5fdc2..f93dc99 100644
--- a/src/kudu/rebalance/rebalancer.cc
+++ b/src/kudu/rebalance/rebalancer.cc
@@ -73,7 +73,8 @@ Rebalancer::Config::Config(vector<string> ignored_tservers_param,
                            bool run_cross_location_rebalancing,
                            bool run_intra_location_rebalancing,
                            double load_imbalance_threshold,
-                           bool force_rebalance_replicas_on_maintenance_tservers)
+                           bool force_rebalance_replicas_on_maintenance_tservers,
+                           size_t intra_location_rebalancing_concurrency)
     : ignored_tservers(ignored_tservers_param.begin(), ignored_tservers_param.end()),
       master_addresses(std::move(master_addresses)),
       table_filters(std::move(table_filters)),
@@ -88,7 +89,9 @@ Rebalancer::Config::Config(vector<string> ignored_tservers_param,
       run_intra_location_rebalancing(run_intra_location_rebalancing),
       load_imbalance_threshold(load_imbalance_threshold),
       force_rebalance_replicas_on_maintenance_tservers(
-          force_rebalance_replicas_on_maintenance_tservers) {
+          force_rebalance_replicas_on_maintenance_tservers),
+      intra_location_rebalancing_concurrency(
+          intra_location_rebalancing_concurrency) {
   DCHECK_GE(max_moves_per_server, 0);
 }
 
diff --git a/src/kudu/rebalance/rebalancer.h b/src/kudu/rebalance/rebalancer.h
index 15cf5bc..079fbcc 100644
--- a/src/kudu/rebalance/rebalancer.h
+++ b/src/kudu/rebalance/rebalancer.h
@@ -68,7 +68,8 @@ class Rebalancer {
            bool run_cross_location_rebalancing = true,
            bool run_intra_location_rebalancing = true,
            double load_imbalance_threshold = kLoadImbalanceThreshold,
-           bool force_rebalance_replicas_on_maintenance_tservers = false);
+           bool force_rebalance_replicas_on_maintenance_tservers = false,
+           size_t intra_location_rebalancing_concurrency = 0);
 
     // UUIDs of ignored servers. If empty, run the rebalancing on
     // all tablet servers in the cluster only when all tablet servers
@@ -136,6 +137,10 @@ class Rebalancer {
     double load_imbalance_threshold;
 
     bool force_rebalance_replicas_on_maintenance_tservers;
+
+    // The maximum number of intra-location rebalancing sessions that can be run
+    // in parallel. Value of 0 means 'the number of CPU cores at the node'.
+    size_t intra_location_rebalancing_concurrency;
   };
 
   // Represents a concrete move of a replica from one tablet server to another.
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 37437c7..e9af759 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -1759,6 +1759,32 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
   }
 }
 
+// Test that rebalancing tasks are scheduled for each of the existing locations
+// even if running the intra-location rebalancing in a non-concurrent fashion.
+TEST_F(LocationAwareRebalancingBasicTest, IntraLocationNoConcurrency) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, };
+  vector<string> table_names;
+  NO_FATALS(Prepare({}, {}, location_info, kEmptySet, &table_names));
+
+  const vector<string> tool_args = {
+    "cluster",
+    "rebalance",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    "--intra_location_rebalancing_concurrency=1",
+  };
+
+  string out;
+  string err;
+  const auto s = RunKuduTool(tool_args, &out, &err);
+  ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
+  ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr);
+  ASSERT_STR_CONTAINS(err, "starting rebalancing within location '/A'");
+  ASSERT_STR_CONTAINS(err, "starting rebalancing within location '/B'");
+  ASSERT_STR_CONTAINS(err, "starting rebalancing within location '/C'");
+}
+
 class LocationAwareBalanceInfoTest : public RebalancingTest {
  public:
   LocationAwareBalanceInfoTest()
diff --git a/src/kudu/tools/rebalancer_tool.cc b/src/kudu/tools/rebalancer_tool.cc
index 8c53b3e..f08526b 100644
--- a/src/kudu/tools/rebalancer_tool.cc
+++ b/src/kudu/tools/rebalancer_tool.cc
@@ -18,8 +18,8 @@
 #include "kudu/tools/rebalancer_tool.h"
 
 #include <algorithm>
-#include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <iostream>
 #include <iterator>
 #include <map>
@@ -41,6 +41,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/rebalance/cluster_status.h"
 #include "kudu/rebalance/placement_policy_util.h"
@@ -52,7 +53,9 @@
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/tools/tool_replica_util.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
 
 using kudu::cluster_summary::ServerHealth;
 using kudu::cluster_summary::ServerHealthSummary;
@@ -84,6 +87,7 @@ using std::sort;
 using std::string;
 using std::to_string;
 using std::transform;
+using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
@@ -93,7 +97,7 @@ namespace kudu {
 namespace tools {
 
 RebalancerTool::RebalancerTool(const Config& config)
-  : Rebalancer(config) {
+    : Rebalancer(config) {
 }
 
 Status RebalancerTool::PrintStats(ostream& out) {
@@ -140,6 +144,7 @@ Status RebalancerTool::PrintStats(ostream& out) {
   sort(locations.begin(), locations.end());
 
   for (const auto& location : locations) {
+    shared_lock<decltype(ksck_lock_)> guard(ksck_lock_);
     ClusterRawInfo raw_info;
     RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, ksck_->results(), &raw_info));
     ClusterInfo ci;
@@ -163,7 +168,11 @@ Status RebalancerTool::Run(RunStatus* result_status, size_t* moves_count) {
   }
 
   ClusterRawInfo raw_info;
-  RETURN_NOT_OK(KsckResultsToClusterRawInfo(boost::none, ksck_->results(), &raw_info));
+  {
+    shared_lock<decltype(ksck_lock_)> guard(ksck_lock_);
+    RETURN_NOT_OK(KsckResultsToClusterRawInfo(
+        boost::none, ksck_->results(), &raw_info));
+  }
 
   ClusterInfo ci;
   RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci));
@@ -237,21 +246,94 @@ Status RebalancerTool::Run(RunStatus* result_status, size_t* moves_count) {
       RETURN_NOT_OK(RunWith(&runner, result_status));
       moves_count_total += runner.moves_count();
     }
-    if (config_.run_intra_location_rebalancing) {
+    if (config_.run_intra_location_rebalancing && !ts_id_by_location.empty()) {
+      const size_t locations_num = ts_id_by_location.size();
+      DCHECK_GT(locations_num, 0);
+
+      vector<RunStatus> location_run_status(locations_num, RunStatus::UNKNOWN);
+      vector<Status> location_status(locations_num, Status::OK());
+      vector<size_t> location_moves_count(locations_num, 0);
+      vector<string> location_by_idx(locations_num);
+
+      // Thread pool to run intra-location rebalancing tasks in parallel. Since
+      // the location assignment provides non-intersecting sets of servers, it's
+      // possible to independently move replicas within different locations.
+      // The pool is automatically shutdown in its destructor.
+      unique_ptr<ThreadPool> rebalance_pool;
+      RETURN_NOT_OK(ThreadPoolBuilder("intra-location-rebalancing")
+                    .set_trace_metric_prefix("rebalancer")
+                    .set_max_threads(
+                        config_.intra_location_rebalancing_concurrency == 0
+                            ? base::NumCPUs()
+                            : config_.intra_location_rebalancing_concurrency)
+                    .Build(&rebalance_pool));
+
       // Run the rebalancing within every location (intra-location rebalancing).
+      size_t location_idx = 0;
       for (const auto& elem : ts_id_by_location) {
-        const auto& location = elem.first;
-        // TODO(aserbin): it would be nice to run these rebalancers in parallel
-        LOG(INFO) << "running rebalancer within location '" << location << "'";
-        IntraLocationRunner runner(this,
-                                   config_.ignored_tservers,
-                                   config_.max_moves_per_server,
-                                   deadline,
-                                   location);
-        RETURN_NOT_OK(runner.Init(config_.master_addresses));
-        RETURN_NOT_OK(RunWith(&runner, result_status));
-        moves_count_total += runner.moves_count();
+        auto location = elem.first;
+        location_by_idx[location_idx] = location;
+        LOG(INFO) << Substitute(
+            "starting rebalancing within location '$0'", location);
+        RETURN_NOT_OK(rebalance_pool->Submit(
+            [this, deadline, location = std::move(location),
+             &config = std::as_const(config_),
+             &location_status = location_status[location_idx],
+             &location_moves_count = location_moves_count[location_idx],
+             &location_run_status = location_run_status[location_idx]]() mutable {
+          IntraLocationRunner runner(this,
+                                     config.ignored_tservers,
+                                     config.max_moves_per_server,
+                                     deadline,
+                                     std::move(location));
+          if (const auto& s = runner.Init(config.master_addresses); !s.ok()) {
+            location_status = s;
+            return;
+          }
+          if (const auto& s = RunWith(&runner, &location_run_status); !s.ok()) {
+            location_status = s;
+            return;
+          }
+          location_moves_count = runner.moves_count();
+        }));
+        ++location_idx;
       }
+      // Wait for the completion of the rebalancing process in every location.
+      rebalance_pool->Wait();
+
+      size_t location_balancing_moves = 0;
+      Status status;
+      RunStatus result_run_status = RunStatus::UNKNOWN;
+      for (size_t location_idx = 0; location_idx < locations_num; ++location_idx) {
+        // This 'for' cycle scope contains logic to compose the overall status
+        // of the intra-location rebalancing based on the statuses of
+        // the individual per-location rebalancing tasks.
+        const auto& s = location_status[location_idx];
+        if (s.ok()) {
+          const auto rs = location_run_status[location_idx];
+          DCHECK(rs != RunStatus::UNKNOWN);
+          if (result_run_status == RunStatus::UNKNOWN ||
+              result_run_status == RunStatus::CLUSTER_IS_BALANCED) {
+            result_run_status = rs;
+          }
+          location_balancing_moves += location_moves_count[location_idx];
+        } else {
+          auto s_with_location_info = s.CloneAndPrepend(Substitute(
+              "location $0", location_by_idx[location_idx]));
+          if (status.ok()) {
+            // Update the overall status to be first seen non-OK status.
+            status = s_with_location_info;
+          } else {
+            // Update the overall status to add info on next non-OK status;
+            status = status.CloneAndAppend(s_with_location_info.message());
+          }
+        }
+      }
+      // Check for the status and bail out if there was an error.
+      RETURN_NOT_OK(status);
+
+      moves_count_total += location_balancing_moves;
+      *result_status = result_run_status;
     }
   }
   if (moves_count != nullptr) {
@@ -651,18 +733,45 @@ Status RebalancerTool::RunWith(Runner* runner, RunStatus* result_status) {
 Status RebalancerTool::GetClusterRawInfo(const boost::optional<string>& location,
                                          ClusterRawInfo* raw_info) {
   RETURN_NOT_OK(RefreshKsckResults());
+  shared_lock<decltype(ksck_lock_)> guard(ksck_lock_);
   return KsckResultsToClusterRawInfo(location, ksck_->results(), raw_info);
 }
 
 Status RebalancerTool::RefreshKsckResults() {
+  std::unique_lock<std::mutex> refresh_guard(ksck_refresh_lock_);
+  if (ksck_refreshing_) {
+    // Other thread is already refreshing the ksck info.
+    ksck_refresh_cv_.wait(refresh_guard, [this]{ return !ksck_refreshing_; });
+    return ksck_refresh_status_;
+  }
+
+  // This thread will be refreshing the ksck info.
+  ksck_refreshing_ = true;
+  refresh_guard.unlock();
+
+  Status refresh_status;
+  SCOPED_CLEANUP({
+    refresh_guard.lock();
+    ksck_refresh_status_ = refresh_status;
+    ksck_refreshing_ = false;
+    refresh_guard.unlock();
+    ksck_refresh_cv_.notify_all();
+  });
   shared_ptr<KsckCluster> cluster;
-  RETURN_NOT_OK_PREPEND(
-      RemoteKsckCluster::Build(config_.master_addresses, &cluster),
-      "unable to build KsckCluster");
+  const auto s = RemoteKsckCluster::Build(config_.master_addresses, &cluster);
+  if (!s.ok()) {
+    refresh_status = s.CloneAndPrepend("unable to build KsckCluster");
+    return refresh_status;
+  }
   cluster->set_table_filters(config_.table_filters);
-  ksck_.reset(new Ksck(cluster));
-  ignore_result(ksck_->Run());
-  return Status::OK();
+
+  {
+    unique_ptr<Ksck> new_ksck(new Ksck(cluster));
+    ignore_result(new_ksck->Run());
+    std::lock_guard<decltype(ksck_lock_)> guard(ksck_lock_);
+    ksck_ = std::move(new_ksck);
+  }
+  return refresh_status;
 }
 
 RebalancerTool::BaseRunner::BaseRunner(RebalancerTool* rebalancer,
diff --git a/src/kudu/tools/rebalancer_tool.h b/src/kudu/tools/rebalancer_tool.h
index 2beda27..80ed12f 100644
--- a/src/kudu/tools/rebalancer_tool.h
+++ b/src/kudu/tools/rebalancer_tool.h
@@ -16,11 +16,13 @@
 // under the License.
 #pragma once
 
-#include <cstddef>
+#include <condition_variable> // IWYU pragma: keep
 #include <cstdint>
+#include <ctime>
 #include <iosfwd>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <random>
 #include <set>
 #include <string>
@@ -33,6 +35,8 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/rebalance/rebalance_algo.h"
 #include "kudu/rebalance/rebalancer.h"
+#include "kudu/tools/ksck.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"     // IWYU pragma: keep
 #include "kudu/util/status.h"
 
@@ -44,7 +48,6 @@ class KuduClient;
 
 namespace tools {
 
-class Ksck;
 struct KsckResults;
 
 // A class implementing logic for Kudu cluster rebalancing.
@@ -404,7 +407,13 @@ class RebalancerTool : public rebalance::Rebalancer {
   Status RefreshKsckResults();
 
   // Auxiliary Ksck object to get information on the cluster.
-  std::shared_ptr<Ksck> ksck_;
+  std::unique_ptr<Ksck> ksck_;    // protected by ksck_lock_
+  rw_spinlock ksck_lock_;
+
+  bool ksck_refreshing_{false};   // protected by ksck_refresh_lock_
+  Status ksck_refresh_status_;    // protected by ksck_refresh_lock_
+  std::mutex ksck_refresh_lock_;
+  std::condition_variable ksck_refresh_cv_;
 };
 
 } // namespace tools
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 1c70264..77bff06 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -91,6 +91,15 @@ DEFINE_uint32(max_staleness_interval_sec, 300,
               "cluster or when some unexpected concurrent activity is "
               "present (such as automatic recovery of failed replicas, etc.)");
 
+DEFINE_uint32(intra_location_rebalancing_concurrency, 0,
+              "How many independent intra-location rebalancing sessions can be "
+              "run in parallel. Since the location assignment naturally provides "
+              "non-intersecting sets of servers, it's possible to "
+              "independently move tablet replicas within different locations "
+              "in parallel. Value of 0 means 'the number of CPU cores'. "
+              "The actual number of concurrent sessions is the minimum of two "
+              "values: this setting and the number of locations in a cluster.");
+
 DEFINE_int64(max_run_time_sec, 0,
              "Maximum time to run the rebalancing, in seconds. Specifying 0 "
              "means not imposing any limit on the rebalancing run time.");
@@ -331,7 +340,8 @@ Status RunRebalance(const RunnerContext& context) {
       !FLAGS_disable_cross_location_rebalancing,
       !FLAGS_disable_intra_location_rebalancing,
       FLAGS_load_imbalance_threshold,
-      FLAGS_force_rebalance_replicas_on_maintenance_tservers));
+      FLAGS_force_rebalance_replicas_on_maintenance_tservers,
+      FLAGS_intra_location_rebalancing_concurrency));
 
   // Print info on pre-rebalance distribution of replicas.
   RETURN_NOT_OK(rebalancer.PrintStats(cout));
@@ -425,6 +435,7 @@ unique_ptr<Mode> BuildClusterMode() {
         .AddOptionalParameter("fetch_info_concurrency")
         .AddOptionalParameter("force_rebalance_replicas_on_maintenance_tservers")
         .AddOptionalParameter("ignored_tservers")
+        .AddOptionalParameter("intra_location_rebalancing_concurrency")
         .AddOptionalParameter("load_imbalance_threshold")
         .AddOptionalParameter("max_moves_per_server")
         .AddOptionalParameter("max_run_time_sec")

[kudu] 01/02: [write_throttling-itest] relax threshold to avoid flakiness

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit df65b3a3d04115fc38bdb739207123816999557d
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Mar 14 17:13:16 2022 -0700

    [write_throttling-itest] relax threshold to avoid flakiness
    
    I saw a failure [1] of the WriteThrottlingTest.ThrottleWriteRpcPerSec
    test scenario during gerrit pre-commit tests for [2]:
    
      src/kudu/integration-tests/write_throttling-itest.cc:107
      Expected: (qps) <= (TARGET_QPS * 1.2f), actual: 121.467 vs 120
    
    This patch relaxes the threshold for the target QPS: 1.2 --> 1.25.
    FWIW, the token-based accounting could be affected by scheduler
    anomalies and bursts in request rates, so there isn't anything
    unexpected here.
    
    [1] http://jenkins.kudu.apache.org/job/kudu-gerrit/25223/
    [2] http://gerrit.cloudera.org:8080/18318
    
    Change-Id: Iad3335595b02e66cdc588755b8f53c77442d5736
    Reviewed-on: http://gerrit.cloudera.org:8080/18320
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/integration-tests/write_throttling-itest.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/kudu/integration-tests/write_throttling-itest.cc b/src/kudu/integration-tests/write_throttling-itest.cc
index 8db7a2a..839db17 100644
--- a/src/kudu/integration-tests/write_throttling-itest.cc
+++ b/src/kudu/integration-tests/write_throttling-itest.cc
@@ -104,7 +104,7 @@ TEST_F(WriteThrottlingTest, ThrottleWriteRpcPerSec) {
     MonoDelta delta = end - begin;
     double qps = TARGET_QPS / delta.ToSeconds();
     LOG(INFO) << "Iteration " << t << " qps: " << qps;
-    ASSERT_LE(qps, TARGET_QPS * 1.2f);
+    ASSERT_LE(qps, TARGET_QPS * 1.25f);
   }
 }