You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/04/19 16:42:54 UTC

[kudu] branch master updated: [ksck] Make ksck tool more concurrent

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d17f9fc  [ksck] Make ksck tool more concurrent
d17f9fc is described below

commit d17f9fce684272df28e6291839f162b33796bc5c
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Tue Apr 16 06:12:02 2019 -0400

    [ksck] Make ksck tool more concurrent
    
    Though ksck fetches tserver info concurrently now, it's still slow when
    fetching info from clusters that have thousands of tables when the CLI
    and the cluster are in different regions.
    This patch make it more concurrent when fetching master info and table
    info.
    
    Change-Id: I2f3f0e3f5115a46dd3afc83dda75e7241618eea4
    Reviewed-on: http://gerrit.cloudera.org:8080/13024
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/tools/ksck-test.cc           |   6 +-
 src/kudu/tools/ksck.cc                | 193 ++++++++++++++++++----------------
 src/kudu/tools/ksck.h                 |  12 ++-
 src/kudu/tools/ksck_remote.cc         |  85 ++++++++++++---
 src/kudu/tools/ksck_remote.h          |  10 +-
 src/kudu/tools/tool_action_cluster.cc |   2 +
 6 files changed, 195 insertions(+), 113 deletions(-)

diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index 34d7296..528aa64 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -200,7 +200,11 @@ class MockKsckCluster : public KsckCluster {
     return Status::OK();
   }
 
-  virtual Status RetrieveTabletsList(const shared_ptr<KsckTable>& table) override {
+  virtual Status RetrieveAllTablets() override {
+    return Status::OK();
+  }
+
+  virtual Status RetrieveTabletsList(const shared_ptr<KsckTable>& /* unused */) override {
     return Status::OK();
   }
 
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index a49d53f..ab28cdd 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -43,8 +43,8 @@
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/string_case.h"
-#include "kudu/util/threadpool.h"
 
 #define PUSH_PREPEND_NOT_OK(s, statuses, msg) do { \
   ::kudu::Status _s = (s); \
@@ -55,8 +55,8 @@
 
 DEFINE_bool(checksum_scan, false,
             "Perform a checksum scan on data in the cluster.");
-DEFINE_int32(fetch_replica_info_concurrency, 20,
-             "Number of concurrent tablet servers to fetch replica info from.");
+DEFINE_int32(fetch_info_concurrency, 20,
+             "Number of threads to fetch info concurrently.");
 
 DEFINE_string(ksck_format, "plain_concise",
               "Output format for ksck. Available options are 'plain_concise', "
@@ -154,60 +154,77 @@ std::ostream& operator<<(std::ostream& lhs, KsckFetchState state) {
 Ksck::Ksck(shared_ptr<KsckCluster> cluster, ostream* out)
     : cluster_(std::move(cluster)),
       out_(out == nullptr ? &std::cout : out) {
+  CHECK_OK(ThreadPoolBuilder("Ksck-fetch")
+               .set_max_threads(FLAGS_fetch_info_concurrency)
+               .set_idle_timeout(MonoDelta::FromMilliseconds(10))
+               .Build(&pool_));
 }
 
 Status Ksck::CheckMasterHealth() {
-  int bad_masters = 0;
-  int unauthorized_masters = 0;
+  int num_masters = static_cast<int>(cluster_->masters().size());
+  if (num_masters == 0) {
+    return Status::NotFound("No masters found");
+  }
+
+  AtomicInt<int32_t> bad_masters(0);
+  AtomicInt<int32_t> unauthorized_masters(0);
+
   vector<KsckServerHealthSummary> master_summaries;
-  // There shouldn't be more than 5 masters, so we'll keep it simple and gather
-  // info in sequence instead of spreading it across a threadpool.
+  simple_spinlock master_summaries_lock;
+
   for (const auto& master : cluster_->masters()) {
-    KsckServerHealthSummary sh;
-    Status s = master->FetchInfo().AndThen([&]() {
+    RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+        KsckServerHealthSummary sh;
+        Status s = master->FetchInfo().AndThen([&]() {
           return master->FetchConsensusState();
         });
-    sh.uuid = master->uuid();
-    sh.address = master->address();
-    sh.version = master->version();
-    sh.status = s;
-    if (!s.ok()) {
-      if (IsNotAuthorizedMethodAccess(s)) {
-        sh.health = KsckServerHealth::UNAUTHORIZED;
-        unauthorized_masters++;
-      } else {
-        sh.health = KsckServerHealth::UNAVAILABLE;
-      }
-      bad_masters++;
-    }
-    master_summaries.push_back(std::move(sh));
-
-    // Fetch the flags information.
-    // Failing to gather flags is only a warning.
-    s = master->FetchUnusualFlags();
-    if (!s.ok()) {
-      results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
-          "unable to get flag information for master $0 ($1)",
-          master->uuid(),
-          master->address())));
-    }
-  }
-  results_.master_summaries.swap(master_summaries);
+        sh.uuid = master->uuid();
+        sh.address = master->address();
+        sh.version = master->version();
+        sh.status = s;
+        if (!s.ok()) {
+          if (IsNotAuthorizedMethodAccess(s)) {
+            sh.health = KsckServerHealth::UNAUTHORIZED;
+            unauthorized_masters.Increment();
+          } else {
+            sh.health = KsckServerHealth::UNAVAILABLE;
+          }
+          bad_masters.Increment();
+        }
+
+        {
+          std::lock_guard<simple_spinlock> lock(master_summaries_lock);
+          master_summaries.push_back(std::move(sh));
+        }
+
+        // Fetch the flags information.
+        // Failing to gather flags is only a warning.
+        s = master->FetchUnusualFlags();
+        if (!s.ok()) {
+          std::lock_guard<simple_spinlock> lock(master_summaries_lock);
+          results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
+              "unable to get flag information for master $0 ($1)",
+              master->uuid(),
+              master->address())));
+        }
+    }));
+  }
+  pool_->Wait();
 
-  int num_masters = cluster_->masters().size();
+  results_.master_summaries.swap(master_summaries);
 
   // Return a NotAuthorized status if any master has auth errors, since this
   // indicates ksck may not be able to gather full and accurate info.
-  if (unauthorized_masters > 0) {
+  if (unauthorized_masters.Load() > 0) {
     return Status::NotAuthorized(
         Substitute("failed to gather info from $0 of $1 "
                    "masters due to lack of admin privileges",
-                   unauthorized_masters, num_masters));
+                   unauthorized_masters.Load(), num_masters));
   }
-  if (bad_masters > 0) {
+  if (bad_masters.Load() > 0) {
     return Status::NetworkError(
         Substitute("failed to gather info from all masters: $0 of $1 had errors",
-                   bad_masters, num_masters));
+                   bad_masters.Load(), num_masters));
   }
   return Status::OK();
 }
@@ -304,18 +321,13 @@ Status Ksck::FetchTableAndTabletInfo() {
 
 Status Ksck::FetchInfoFromTabletServers() {
   VLOG(1) << "Fetching the list of tablet servers";
-  int servers_count = cluster_->tablet_servers().size();
+  int servers_count = static_cast<int>(cluster_->tablet_servers().size());
   VLOG(1) << Substitute("List of $0 tablet servers retrieved", servers_count);
 
   if (servers_count == 0) {
-    return Status::NotFound("No tablet servers found");
+    return Status::OK();
   }
 
-  gscoped_ptr<ThreadPool> pool;
-  RETURN_NOT_OK(ThreadPoolBuilder("ksck-fetch")
-                .set_max_threads(FLAGS_fetch_replica_info_concurrency)
-                .Build(&pool));
-
   AtomicInt<int32_t> bad_servers(0);
   AtomicInt<int32_t> unauthorized_servers(0);
   VLOG(1) << "Fetching info from all " << servers_count << " tablet servers";
@@ -325,53 +337,51 @@ Status Ksck::FetchInfoFromTabletServers() {
 
   for (const auto& entry : cluster_->tablet_servers()) {
     const auto& ts = entry.second;
-    CHECK_OK(pool->SubmitFunc([&]() {
-          VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
-          KsckServerHealth health;
-          Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
-                if (FLAGS_consensus) {
-                  return ts->FetchConsensusState(&health);
-                }
-                return Status::OK();
-              });
-          KsckServerHealthSummary summary;
-          summary.uuid = ts->uuid();
-          summary.address = ts->address();
-          summary.ts_location = ts->location();
-          summary.version = ts->version();
-          summary.status = s;
-          if (!s.ok()) {
-            if (IsNotAuthorizedMethodAccess(s)) {
-              health = KsckServerHealth::UNAUTHORIZED;
-              unauthorized_servers.Increment();
+    RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+        VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
+        KsckServerHealth health;
+        Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
+            if (FLAGS_consensus) {
+              return ts->FetchConsensusState(&health);
             }
-            bad_servers.Increment();
-          }
-          summary.health = health;
-
-          {
-            std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
-            tablet_server_summaries.push_back(std::move(summary));
+            return Status::OK();
+        });
+        KsckServerHealthSummary summary;
+        summary.uuid = ts->uuid();
+        summary.address = ts->address();
+        summary.ts_location = ts->location();
+        summary.version = ts->version();
+        summary.status = s;
+        if (!s.ok()) {
+          if (IsNotAuthorizedMethodAccess(s)) {
+            health = KsckServerHealth::UNAUTHORIZED;
+            unauthorized_servers.Increment();
           }
-
-          // Fetch the flags information.
-          // Failing to gather flags is only a warning.
-          s = ts->FetchUnusualFlags();
-          if (!s.ok()) {
-            results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
-                    "unable to get flag information for tablet server $0 ($1)",
-                    ts->uuid(),
-                    ts->address())));
-          }
-        }));
-  }
-  pool->Wait();
+          bad_servers.Increment();
+        }
+        summary.health = health;
+
+        {
+          std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
+          tablet_server_summaries.push_back(std::move(summary));
+        }
+
+        // Fetch the flags information.
+        // Failing to gather flags is only a warning.
+        s = ts->FetchUnusualFlags();
+        if (!s.ok()) {
+          std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
+          results_.warning_messages.push_back(s.CloneAndPrepend(Substitute(
+              "unable to get flag information for tablet server $0 ($1)",
+              ts->uuid(),
+              ts->address())));
+        }
+    }));
+  }
+  pool_->Wait();
 
   results_.tserver_summaries.swap(tablet_server_summaries);
 
-  if (bad_servers.Load() == 0) {
-    return Status::OK();
-  }
   // Return a NotAuthorized status if any tablet server has auth errors, since
   // this indicates ksck may not be able to gather full and accurate info.
   if (unauthorized_servers.Load() > 0) {
@@ -380,9 +390,12 @@ Status Ksck::FetchInfoFromTabletServers() {
                    "to lack of admin privileges",
                    unauthorized_servers.Load(), servers_count));
   }
-  return Status::NetworkError(
+  if (bad_servers.Load() > 0) {
+    return Status::NetworkError(
       Substitute("failed to gather info for all tablet servers: $0 of $1 had errors",
                  bad_servers.Load(), servers_count));
+  }
+  return Status::OK();
 }
 
 void Ksck::set_print_sections(const std::vector<std::string>& sections) {
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index 90246bb..ca788ab 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -36,6 +36,7 @@
 
 #include "kudu/common/schema.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/server/server_base.pb.h"
@@ -43,10 +44,12 @@
 #include "kudu/tablet/tablet.pb.h"  // IWYU pragma: keep
 #include "kudu/tools/ksck_results.h"
 #include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
 
 namespace kudu {
 
 class MonoDelta;
+
 namespace rpc {
 class Messenger;
 } // namespace rpc
@@ -407,9 +410,7 @@ class KsckCluster {
     RETURN_NOT_OK(Connect());
     RETURN_NOT_OK(RetrieveTablesList());
     RETURN_NOT_OK(RetrieveTabletServers());
-    for (const std::shared_ptr<KsckTable>& table : tables()) {
-      RETURN_NOT_OK(RetrieveTabletsList(table));
-    }
+    RETURN_NOT_OK(RetrieveAllTablets());
     return Status::OK();
   }
 
@@ -422,6 +423,9 @@ class KsckCluster {
   // Fetches the list of tables.
   virtual Status RetrieveTablesList() = 0;
 
+  // Fetches all tablets in the cluster.
+  virtual Status RetrieveAllTablets() = 0;
+
   // Fetches the list of tablets for the given table.
   // The table's tablet list is modified only if this method returns OK.
   virtual Status RetrieveTabletsList(const std::shared_ptr<KsckTable>& table) = 0;
@@ -449,6 +453,7 @@ class KsckCluster {
   MasterList masters_;
   TSMap tablet_servers_;
   std::vector<std::shared_ptr<KsckTable>> tables_;
+  gscoped_ptr<ThreadPool> pool_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(KsckCluster);
@@ -571,6 +576,7 @@ class Ksck {
                            int table_num_replicas);
 
   const std::shared_ptr<KsckCluster> cluster_;
+  gscoped_ptr<ThreadPool> pool_;
 
   bool check_replica_count_ = true;
   std::vector<std::string> table_filters_;
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 3c69e75..f217108 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -19,7 +19,9 @@
 
 #include <atomic>
 #include <cstdint>
+#include <functional>
 #include <map>
+#include <mutex>
 #include <ostream>
 #include <unordered_map>
 
@@ -59,12 +61,17 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_service.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/threadpool.h"
 #include "kudu/util/version_info.pb.h"
 
 DECLARE_int64(timeout_ms); // defined in tool_action_common
+DECLARE_int32(fetch_info_concurrency);
+
 DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache the read blocks");
 
 namespace kudu {
@@ -450,6 +457,19 @@ void RemoteKsckTabletServer::RunTabletChecksumScanAsync(
   ignore_result(stepper.release()); // Deletes self on callback.
 }
 
+RemoteKsckCluster::RemoteKsckCluster(std::vector<std::string> master_addresses,
+                                     std::shared_ptr<rpc::Messenger> messenger)
+    : master_addresses_(std::move(master_addresses)),
+      messenger_(std::move(messenger)) {
+  for (const std::string& master_addr : master_addresses_) {
+    masters_.emplace_back(new RemoteKsckMaster(master_addr, messenger_));
+  }
+  CHECK_OK(ThreadPoolBuilder("RemoteKsckCluster-fetch")
+               .set_max_threads(FLAGS_fetch_info_concurrency)
+               .set_idle_timeout(MonoDelta::FromMilliseconds(10))
+               .Build(&pool_));
+}
+
 Status RemoteKsckCluster::Connect() {
   KuduClientBuilder builder;
   builder.default_rpc_timeout(GetDefaultTimeout());
@@ -496,24 +516,63 @@ Status RemoteKsckCluster::RetrieveTablesList() {
   vector<string> table_names;
   RETURN_NOT_OK(client_->ListTables(&table_names));
 
-  vector<shared_ptr<KsckTable>> tables_temp;
-  for (const auto& n : table_names) {
-    client::sp::shared_ptr<KuduTable> t;
-    RETURN_NOT_OK(client_->OpenTable(n, &t));
+  int num_tables = static_cast<int>(table_names.size());
+  if (num_tables == 0) {
+    return Status::OK();
+  }
+
+  AtomicInt<int32_t> bad_tables(0);
+  vector<shared_ptr<KsckTable>> tables;
+  simple_spinlock tables_lock;
+
+  for (const auto& table_name : table_names) {
+    RETURN_NOT_OK(pool_->SubmitFunc([&]() {
+        client::sp::shared_ptr<KuduTable> t;
+        Status s = client_->OpenTable(table_name, &t);
+        if (!s.ok()) {
+          bad_tables.Increment();
+          LOG(ERROR) << Substitute("unable to open table $0: $1", table_name, s.ToString());
+          return;
+        }
+        shared_ptr<KsckTable> table(new KsckTable(t->id(),
+                                                  table_name,
+                                                  *t->schema().schema_,
+                                                  t->num_replicas()));
+        {
+          std::lock_guard<simple_spinlock> l(tables_lock);
+          tables.push_back(table);
+        }
+    }));
+  }
+  pool_->Wait();
+
+  tables_.swap(tables);
+
+  if (bad_tables.Load() > 0) {
+    return Status::NetworkError(
+        Substitute("failed to gather info from all tables: $0 of $1 had errors",
+                   bad_tables.Load(), num_tables));
+  }
+
+  return Status::OK();
+}
+
+Status RemoteKsckCluster::RetrieveAllTablets() {
+  int num_tables = static_cast<int>(tables().size());
+  if (num_tables == 0) {
+    return Status::OK();
+  }
 
-    shared_ptr<KsckTable> table(new KsckTable(t->id(),
-                                              n,
-                                              *t->schema().schema_,
-                                              t->num_replicas()));
-    tables_temp.push_back(table);
+  for (const shared_ptr<KsckTable>& table : tables()) {
+    RETURN_NOT_OK(pool_->SubmitFunc(
+        std::bind(&KsckCluster::RetrieveTabletsList, this, table)));
   }
-  tables_.swap(tables_temp);
+  pool_->Wait();
+
   return Status::OK();
 }
 
 Status RemoteKsckCluster::RetrieveTabletsList(const shared_ptr<KsckTable>& table) {
-  vector<shared_ptr<KsckTablet>> tablets;
-
   client::sp::shared_ptr<KuduTable> client_table;
   RETURN_NOT_OK(client_->OpenTable(table->name(), &client_table));
 
@@ -522,6 +581,8 @@ Status RemoteKsckCluster::RetrieveTabletsList(const shared_ptr<KsckTable>& table
 
   KuduScanTokenBuilder builder(client_table.get());
   RETURN_NOT_OK(builder.Build(&tokens));
+
+  vector<shared_ptr<KsckTablet>> tablets;
   for (const auto* t : tokens) {
     shared_ptr<KsckTablet> tablet(
         new KsckTablet(table.get(), t->tablet().id()));
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 1872bc8..9e348ac 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -161,6 +161,8 @@ class RemoteKsckCluster : public KsckCluster {
 
   virtual Status RetrieveTablesList() override;
 
+  virtual Status RetrieveAllTablets() override;
+
   virtual Status RetrieveTabletsList(const std::shared_ptr<KsckTable>& table) override;
 
   std::shared_ptr<rpc::Messenger> messenger() const override {
@@ -169,13 +171,7 @@ class RemoteKsckCluster : public KsckCluster {
 
  private:
   RemoteKsckCluster(std::vector<std::string> master_addresses,
-                    std::shared_ptr<rpc::Messenger> messenger)
-      : master_addresses_(std::move(master_addresses)),
-        messenger_(std::move(messenger)) {
-    for (const std::string& master_addr : master_addresses_) {
-      masters_.emplace_back(new RemoteKsckMaster(master_addr, messenger_));
-    }
-  }
+                    std::shared_ptr<rpc::Messenger> messenger);
 
   const std::vector<std::string> master_addresses_;
   const std::shared_ptr<rpc::Messenger> messenger_;
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index db295e5..3d78edf 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -371,6 +371,7 @@ unique_ptr<Mode> BuildClusterMode() {
         .AddOptionalParameter("checksum_timeout_sec")
         .AddOptionalParameter("color")
         .AddOptionalParameter("consensus")
+        .AddOptionalParameter("fetch_info_concurrency")
         .AddOptionalParameter("ksck_format")
         .AddOptionalParameter("sections")
         .AddOptionalParameter("tables")
@@ -395,6 +396,7 @@ unique_ptr<Mode> BuildClusterMode() {
         .AddOptionalParameter("disable_policy_fixer")
         .AddOptionalParameter("disable_cross_location_rebalancing")
         .AddOptionalParameter("disable_intra_location_rebalancing")
+        .AddOptionalParameter("fetch_info_concurrency")
         .AddOptionalParameter("load_imbalance_threshold")
         .AddOptionalParameter("max_moves_per_server")
         .AddOptionalParameter("max_run_time_sec")