You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/05/07 04:17:32 UTC

[kudu] branch master updated: [tools] use thread pool for 'kudu local_replica copy_from_local'

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d32ea18f0 [tools] use thread pool for 'kudu local_replica copy_from_local'
d32ea18f0 is described below

commit d32ea18f017582e1d052ded617a2c13608b0d54b
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Mon Apr 25 20:05:31 2022 +0800

    [tools] use thread pool for 'kudu local_replica copy_from_local'
    
    In real world, we often use copy_from_local to copy a large
    amount of tablets, we can pass a tablet id list to the tool
    for convenient, and we can also gain profit of opening
    filesystem only once to reduce time.
    
    This patch also updates the related unit test.
    
    Change-Id: I4de49e948cc1a686db5e1bf424470ca9e800ee36
    Reviewed-on: http://gerrit.cloudera.org:8080/18448
    Tested-by: Kudu Jenkins
    Reviewed-by: Yifan Zhang <ch...@163.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tools/kudu-tool-test.cc            |  22 ++---
 src/kudu/tools/tool_action_common.cc        |   3 +-
 src/kudu/tools/tool_action_local_replica.cc | 137 ++++++++++++++++++++++------
 3 files changed, 120 insertions(+), 42 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 94ed6dbfe..0d1de64e6 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1133,7 +1133,7 @@ TEST_F(ToolTest, TestModeHelp) {
         "data_size.*Summarize the data size",
         "dump.*Dump a Kudu filesystem",
         "copy_from_remote.*Copy a tablet replica from a remote server",
-        "copy_from_local.*Copy a tablet replica from local filesystem",
+        "copy_from_local.*Copy tablet replicas from local filesystem",
         "delete.*Delete tablet replicas from the local filesystem",
         "list.*Show list of tablet replicas",
     };
@@ -1171,7 +1171,7 @@ TEST_F(ToolTest, TestModeHelp) {
   }
   {
     const vector<string> kLocalReplicaCopyFromRemoteRegexes = {
-        "Copy a tablet replica from local filesystem",
+        "Copy tablet replicas from local filesystem",
     };
     NO_FATALS(RunTestHelp("local_replica copy_from_local --help",
                           kLocalReplicaCopyFromRemoteRegexes));
@@ -8057,16 +8057,14 @@ TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
 
   // Copy source tserver's all replicas from local filesystem.
   string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : "";
-  for (const auto& tablet_id : tablet_ids) {
-    string stdout;
-    NO_FATALS(RunActionStdoutString(Substitute("local_replica copy_from_local $0 $1 $2 $3",
-                                               tablet_id,
-                                               src_fs_paths_with_prefix,
-                                               dst_fs_paths_with_prefix,
-                                               encryption_args),
-                                    &stdout));
-    SCOPED_TRACE(stdout);
-  }
+  string stdout;
+  NO_FATALS(RunActionStdoutString(Substitute("local_replica copy_from_local $0 $1 $2 $3",
+                                             JoinStrings(tablet_ids, ","),
+                                             src_fs_paths_with_prefix,
+                                             dst_fs_paths_with_prefix,
+                                             encryption_args),
+                                  &stdout));
+  SCOPED_TRACE(stdout);
 
   // Replace the old data/wal dirs with the new ones.
   ASSERT_OK(Env::Default()->RenameFile(src_tserver_fs_root, src_tserver_fs_root + ".bak"));
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index b615b71bd..936af96e1 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -133,8 +133,7 @@ DEFINE_string(memtracker_output, "table",
               "the memtracker hierarchy.");
 
 DEFINE_int32(num_threads, 2,
-             "Number of threads to run. Each thread runs its own "
-             "KuduSession.");
+             "Number of threads to run.");
 static bool ValidateNumThreads(const char* flag_name, int32_t flag_value) {
   if (flag_value <= 0) {
     LOG(ERROR) << strings::Substitute("'$0' flag should have a positive value",
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 8dca80a2f..6f0fcfc7f 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -22,6 +22,7 @@
 #include <iostream>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <set>
 #include <string>
 #include <unordered_map>
@@ -82,12 +83,14 @@
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
 
 namespace kudu {
 namespace rpc {
@@ -141,6 +144,7 @@ DEFINE_string(dst_fs_metadata_dir, "",
               "metadata directory if any exists. If none exists, --dst_fs_wal_dir "
               "will be used as the metadata directory.");;
 
+DECLARE_int32(num_threads);
 DECLARE_int32(tablet_copy_download_threads_nums_per_session);
 
 using kudu::consensus::ConsensusMetadata;
@@ -164,6 +168,7 @@ using kudu::tablet::TabletMetadata;
 using kudu::tablet::TabletReplica;
 using kudu::tserver::LocalTabletCopyClient;
 using kudu::tserver::RemoteTabletCopyClient;
+using kudu::tserver::TabletCopyClientMetrics;
 using kudu::tserver::TSTabletManager;
 using std::cout;
 using std::endl;
@@ -438,8 +443,13 @@ Status CopyFromRemote(const RunnerContext& context) {
 }
 
 Status CopyFromLocal(const RunnerContext& context) {
-  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
+  const string& tablet_ids_str = FindOrDie(context.required_args, kTabletIdsCsvArg);
+  set<string> tablet_ids_to_copy = strings::Split(tablet_ids_str, ",", strings::SkipEmpty());
+  if (tablet_ids_to_copy.empty()) {
+    return Status::InvalidArgument("no tablet identifiers provided");
+  }
 
+  // Open source filesystem.
   FsManagerOpts src_fs_mgr_opt;
   src_fs_mgr_opt.wal_root = FLAGS_src_fs_wal_dir;
   src_fs_mgr_opt.metadata_root = FLAGS_src_fs_metadata_dir;
@@ -447,45 +457,115 @@ Status CopyFromLocal(const RunnerContext& context) {
   FsManager src_fs_manager(Env::Default(), src_fs_mgr_opt);
   RETURN_NOT_OK(src_fs_manager.Open());
 
-  vector<string> tablet_ids;
-  RETURN_NOT_OK(src_fs_manager.ListTabletIds(&tablet_ids));
-  set<string> tablet_ids_set(tablet_ids.begin(), tablet_ids.end());
-  if (!ContainsKey(tablet_ids_set, tablet_id)) {
-    return Status::NotFound("There is no matched tablet");
-  }
-
+  // Open destination filesystem.
   FsManagerOpts dst_fs_mgr_opt;
   dst_fs_mgr_opt.wal_root = FLAGS_dst_fs_wal_dir;
   dst_fs_mgr_opt.metadata_root = FLAGS_dst_fs_metadata_dir;
   dst_fs_mgr_opt.data_roots = strings::Split(FLAGS_dst_fs_data_dirs, ",", strings::SkipEmpty());
-
   FsManager dst_fs_manager(Env::Default(), dst_fs_mgr_opt);
   RETURN_NOT_OK(dst_fs_manager.Open());
   scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager(
       new ConsensusMetadataManager(&dst_fs_manager));
 
-  LocalTabletCopyClient client(tablet_id,
-                               &dst_fs_manager,
-                               dst_cmeta_manager,
-                               /* messenger */ nullptr,
-                               /* tablet_copy_client_metrics */ nullptr,
-                               &src_fs_manager,
-                               /* tablet_copy_source_metrics */ nullptr);
+  // Get all tablet ids in source filesystem.
+  vector<string> src_tablet_ids;
+  RETURN_NOT_OK(src_fs_manager.ListTabletIds(&src_tablet_ids));
+  set<string> src_tablet_ids_set(src_tablet_ids.begin(), src_tablet_ids.end());
+
+  // Prepare to check copy progress.
+  int total_tablet_count = tablet_ids_to_copy.size();
+  // 'lock' is used for protecting 'copying_replicas', 'failed_tablet_ids'
+  // and 'succeed_tablet_count'.
+  simple_spinlock lock;
+  set<TabletReplica*> copying_replicas;
+  set<string> failed_tablet_ids;
+  int succeed_tablet_count = 0;
+  for (auto tablet_id = tablet_ids_to_copy.begin();
+       tablet_id != tablet_ids_to_copy.end();) {
+    if (!ContainsKey(src_tablet_ids_set, *tablet_id)) {
+      LOG(ERROR) << Substitute("Tablet $0 copy failed: not found in source filesystem.",
+                               *tablet_id);
+      InsertOrDie(&failed_tablet_ids, *tablet_id);
+      tablet_id = tablet_ids_to_copy.erase(tablet_id);
+    } else {
+      tablet_id++;
+    }
+  }
 
-  // Create a fake replica to obtain copy process periodically.
-  scoped_refptr<TabletReplica> fake_replica(new TabletReplica());
+  // Create a thread to obtain copy process periodically.
   CountDownLatch latch(1);
   scoped_refptr<Thread> check_thread;
-  RETURN_NOT_OK(Thread::Create("Kudu-tool", "check_progress",
-                               [&latch, fake_replica]() {
-                                 while (!latch.WaitFor(MonoDelta::FromSeconds(10))) {
-                                   LOG(INFO) << fake_replica->last_status();
-                                }
+  RETURN_NOT_OK(Thread::Create("tool-tablet-copy", "check-progress",
+      [&] () {
+        while (!latch.WaitFor(MonoDelta::FromSeconds(10))) {
+          std::lock_guard<simple_spinlock> l(lock);
+          for (const auto& copying_replica : copying_replicas) {
+            LOG(INFO) << Substitute("Tablet $0 copy status: $1",
+                                    copying_replica->tablet_id(),
+                                    copying_replica->last_status());
+          }
+        }
       }, &check_thread));
 
-  RETURN_NOT_OK(client.Start(tablet_id, /* meta */ nullptr));
-  RETURN_NOT_OK(client.FetchAll(fake_replica));
-  RETURN_NOT_OK(client.Finish());
+  // Init TabletCopyClientMetrics.
+  MetricRegistry metric_registry;
+  scoped_refptr<MetricEntity> metric_entity(
+      METRIC_ENTITY_server.Instantiate(&metric_registry, "tool-tablet-copy"));
+  TabletCopyClientMetrics tablet_copy_client_metrics(metric_entity);
+
+  // Create a thread pool to copy tablets.
+  std::unique_ptr<ThreadPool> copy_pool;
+  ThreadPoolBuilder("tool-tablet-copy-pool")
+      .set_max_threads(FLAGS_num_threads)
+      .set_min_threads(FLAGS_num_threads)
+      .Build(&copy_pool);
+
+  // Start to copy tablets.
+  for (const auto& tablet_id : tablet_ids_to_copy) {
+    RETURN_NOT_OK(copy_pool->Submit([&]() {
+      // 'fake_replica' is used for checking copy progress only.
+      scoped_refptr<TabletReplica> fake_replica(new TabletReplica());
+      {
+        std::lock_guard<simple_spinlock> l(lock);
+        LOG(WARNING) << "Start to copy tablet " << tablet_id;
+        InsertOrDie(&copying_replicas, fake_replica.get());
+      }
+
+      LocalTabletCopyClient client(tablet_id,
+                                   &dst_fs_manager,
+                                   dst_cmeta_manager,
+                                   /* messenger */ nullptr,
+                                   &tablet_copy_client_metrics,
+                                   &src_fs_manager,
+                                   /* tablet_copy_source_metrics */ nullptr);
+      Status s = client.Start(tablet_id, /* meta */ nullptr).AndThen([&] {
+        return client.FetchAll(fake_replica);
+      }).AndThen([&] {
+        return client.Finish();
+      });
+
+      {
+        std::lock_guard<simple_spinlock> l(lock);
+        if (!s.ok()) {
+          InsertOrDie(&failed_tablet_ids, tablet_id);
+          LOG(ERROR) << Substitute("Tablet $0 copy failed: $1.", tablet_id, s.ToString());
+        } else {
+          succeed_tablet_count++;
+          LOG(INFO) << Substitute("Tablet $0 copy succeed.", tablet_id);
+        }
+        copying_replicas.erase(fake_replica.get());
+
+        LOG(INFO) << Substitute("$0/$1 tablets, $2 bytes copied, include $3 failed tablets.",
+                                succeed_tablet_count + failed_tablet_ids.size(),
+                                total_tablet_count,
+                                tablet_copy_client_metrics.bytes_fetched->value(),
+                                failed_tablet_ids.size());
+      }
+    }));
+  }
+
+  copy_pool->Wait();
+  copy_pool->Shutdown();
   latch.CountDown();
   check_thread->Join();
 
@@ -1041,16 +1121,17 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
 
   unique_ptr<Action> copy_from_local =
       ActionBuilder("copy_from_local", &CopyFromLocal)
-      .Description("Copy a tablet replica from local filesystem. Before using this tool, you "
+      .Description("Copy tablet replicas from local filesystem. Before using this tool, you "
           "MUST stop the master/tserver you want to copy from, and make sure --src_*_dir(s) and "
           "--dst_*_dir(s) are exactly what whey should be.")
-      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .AddRequiredParameter({ kTabletIdsCsvArg, kTabletIdsCsvArgDesc })
       .AddOptionalParameter("src_fs_wal_dir")
       .AddOptionalParameter("src_fs_metadata_dir")
       .AddOptionalParameter("src_fs_data_dirs")
       .AddOptionalParameter("dst_fs_wal_dir")
       .AddOptionalParameter("dst_fs_metadata_dir")
       .AddOptionalParameter("dst_fs_data_dirs")
+      .AddOptionalParameter("num_threads")
       .Build();
 
   unique_ptr<Action> list =