You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/08/02 02:40:08 UTC

[kudu] branch master updated: [Tools] Copy tablets from a remote server in batch

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 17528c7a8 [Tools] Copy tablets from a remote server in batch
17528c7a8 is described below

commit 17528c7a8ab1e44a421d535346e849cfaf37d3d1
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Thu Jul 14 18:40:50 2022 +0800

    [Tools] Copy tablets from a remote server in batch
    
    The command: kudu local_replica copy_from_remote only supports to copy one
    tablet from a remote server once a time. It is not efficient when we need
    to copy all tablets from a remote server.
    
    Therefore this patch supports to copy tablets from a remote server in batch.
    
    Change-Id: Ib598142883b8ab958625a4f04648d58ea95f3664
    Reviewed-on: http://gerrit.cloudera.org:8080/18732
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <ac...@gmail.com>
---
 src/kudu/tablet/tablet_replica.h            |   5 +-
 src/kudu/tools/kudu-tool-test.cc            |  39 +++-
 src/kudu/tools/tool_action_local_replica.cc | 283 +++++++++++++++++-----------
 3 files changed, 212 insertions(+), 115 deletions(-)

diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index ead1fb830..099111766 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -83,8 +83,7 @@ class ResultTracker;
 } // namespace rpc
 
 namespace tools {
-struct RunnerContext;
-Status CopyFromLocal(const RunnerContext& context);
+class TabletCopier;
 } // namespace tools
 
 namespace tablet {
@@ -395,7 +394,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   void BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback began_txn_cb);
 
  private:
-  friend Status kudu::tools::CopyFromLocal(const kudu::tools::RunnerContext& context);
+  friend class kudu::tools::TabletCopier;
   friend class kudu::AlterTableTest;
   friend class RefCountedThreadSafe<TabletReplica>;
   friend class TabletReplicaTest;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 697edc4a3..e568eecff 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -213,6 +213,7 @@ using kudu::tserver::TabletServerErrorPB;
 using kudu::tserver::WriteRequestPB;
 using std::back_inserter;
 using std::copy;
+using std::find;
 using std::make_pair;
 using std::map;
 using std::max;
@@ -1237,7 +1238,7 @@ TEST_F(ToolTest, TestModeHelp) {
         "cmeta.*Operate on a local tablet replica's consensus",
         "data_size.*Summarize the data size",
         "dump.*Dump a Kudu filesystem",
-        "copy_from_remote.*Copy a tablet replica from a remote server",
+        "copy_from_remote.*Copy tablet replicas from a remote server",
         "copy_from_local.*Copy tablet replicas from local filesystem",
         "delete.*Delete tablet replicas from the local filesystem",
         "list.*Show list of tablet replicas",
@@ -1266,7 +1267,7 @@ TEST_F(ToolTest, TestModeHelp) {
   }
   {
     const vector<string> kLocalReplicaCopyFromRemoteRegexes = {
-        "Copy a tablet replica from a remote server",
+        "Copy tablet replicas from a remote server",
     };
     NO_FATALS(RunTestHelp("local_replica copy_from_remote --help",
                           kLocalReplicaCopyFromRemoteRegexes));
@@ -8179,6 +8180,40 @@ TEST_F(ToolTest, TestLocalReplicaCopyLocal) {
   ASSERT_EQ(src_stdout, dst_stdout);
 }
 
+TEST_F(ToolTest, TestLocalReplicaCopyRemote) {
+  InternalMiniClusterOptions opts;
+  opts.num_tablet_servers = 2;
+  NO_FATALS(StartMiniCluster(std::move(opts)));
+  NO_FATALS(CreateTableWithFlushedData("table1", mini_cluster_.get(), 3, 1));
+  NO_FATALS(CreateTableWithFlushedData("table2", mini_cluster_.get(), 3, 1));
+  int source_tserver_tablet_count = mini_cluster_->mini_tablet_server(0)->ListTablets().size();
+  int target_tserver_tablet_count_before = mini_cluster_->mini_tablet_server(1)
+                                                        ->ListTablets().size();
+  string tablet_ids_str = JoinStrings(mini_cluster_->mini_tablet_server(0)->ListTablets(), ",");
+  string source_tserver_rpc_addr = mini_cluster_->mini_tablet_server(0)
+                                                ->bound_rpc_addr().ToString();
+  string wal_dir = mini_cluster_->mini_tablet_server(1)->options()->fs_opts.wal_root;
+  string data_dirs = JoinStrings(mini_cluster_->mini_tablet_server(1)
+                                              ->options()->fs_opts.data_roots, ",");
+  NO_FATALS(mini_cluster_->mini_tablet_server(1)->Shutdown());
+  // Copy tablet replicas from tserver0 to tserver1.
+  NO_FATALS(RunActionStdoutNone(
+      Substitute("local_replica copy_from_remote $0 $1 "
+                 "-fs_data_dirs=$2 -fs_wal_dir=$3 -num_threads=3",
+                 tablet_ids_str,
+                 source_tserver_rpc_addr,
+                 data_dirs,
+                 wal_dir)));
+  NO_FATALS(mini_cluster_->mini_tablet_server(1)->Start());
+  const vector<string>& target_tablet_ids = mini_cluster_->mini_tablet_server(1)->ListTablets();
+  ASSERT_EQ(source_tserver_tablet_count + target_tserver_tablet_count_before,
+            target_tablet_ids.size());
+  for (string tablet_id : mini_cluster_->mini_tablet_server(0)->ListTablets()) {
+    ASSERT_TRUE(find(target_tablet_ids.begin(), target_tablet_ids.end(), tablet_id)
+                != target_tablet_ids.end());
+  }
+}
+
 TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
   // Local copies are not supported on encrypted severs at this time.
   if (FLAGS_encrypt_data_at_rest) {
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 82cc0cfff..afe405837 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -168,11 +168,13 @@ using kudu::tablet::TabletMetadata;
 using kudu::tablet::TabletReplica;
 using kudu::tserver::LocalTabletCopyClient;
 using kudu::tserver::RemoteTabletCopyClient;
+using kudu::tserver::TabletCopyClient;
 using kudu::tserver::TabletCopyClientMetrics;
 using kudu::tserver::TSTabletManager;
 using std::cout;
 using std::endl;
 using std::map;
+using std::move;
 using std::pair;
 using std::shared_ptr;
 using std::set;
@@ -206,6 +208,160 @@ string Indent(int indent) {
 }
 } // anonymous namespace
 
+class TabletCopier {
+ public:
+  TabletCopier(set<string> tablet_ids_to_copy,
+               FsManager* dst_fs_manager,
+               scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager,
+               HostPort source_addr) :
+                  tablet_ids_to_copy_(move(tablet_ids_to_copy)),
+                  dst_fs_manager_(dst_fs_manager),
+                  dst_cmeta_manager_(move(dst_cmeta_manager)),
+                  source_addr_(move(source_addr)),
+                  copy_type_(CopyType::FROM_REMOTE) {
+  }
+
+  TabletCopier(set<string> tablet_ids_to_copy,
+               FsManager* dst_fs_manager,
+               scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager,
+               FsManager* src_fs_manager,
+               set<string> src_tablet_ids_set) :
+                  tablet_ids_to_copy_(move(tablet_ids_to_copy)),
+                  dst_fs_manager_(dst_fs_manager),
+                  dst_cmeta_manager_(move(dst_cmeta_manager)),
+                  src_fs_manager_(move(src_fs_manager)),
+                  src_tablet_ids_set_(move(src_tablet_ids_set)),
+                  copy_type_(CopyType::FROM_LOCAL) {
+  }
+
+  ~TabletCopier() = default;
+
+  Status CopyTablets() {
+    // Prepare to check copy progress.
+    int total_tablet_count = tablet_ids_to_copy_.size();
+    // 'lock' is used for protecting 'copying_replicas', 'failed_tablet_ids'
+    // and 'succeed_tablet_count'.
+    simple_spinlock lock;
+    set<TabletReplica*> copying_replicas;
+    set<string> failed_tablet_ids;
+    int succeed_tablet_count = 0;
+    if (copy_type_ == CopyType::FROM_LOCAL) {
+      for (auto tablet_id = tablet_ids_to_copy_.begin();
+          tablet_id != tablet_ids_to_copy_.end();) {
+        if (!ContainsKey(src_tablet_ids_set_, *tablet_id)) {
+          LOG(ERROR) << Substitute("Tablet $0 copy failed: not found in source filesystem.",
+                                   *tablet_id);
+          InsertOrDie(&failed_tablet_ids, *tablet_id);
+          tablet_id = tablet_ids_to_copy_.erase(tablet_id);
+        } else {
+          tablet_id++;
+        }
+      }
+    }
+
+    // Create a thread to obtain copy process periodically.
+    CountDownLatch latch(1);
+    scoped_refptr<Thread> check_thread;
+    RETURN_NOT_OK(Thread::Create("tool-tablet-copy", "check-progress",
+        [&] () {
+          while (!latch.WaitFor(MonoDelta::FromSeconds(10))) {
+            std::lock_guard<simple_spinlock> l(lock);
+            for (const auto& copying_replica : copying_replicas) {
+              LOG(INFO) << Substitute("Tablet $0 copy status: $1",
+                                      copying_replica->tablet_id(),
+                                      copying_replica->last_status());
+            }
+          }
+        }, &check_thread));
+
+    // Init TabletCopyClientMetrics.
+    MetricRegistry metric_registry;
+    scoped_refptr<MetricEntity> metric_entity(
+        METRIC_ENTITY_server.Instantiate(&metric_registry, "tool-tablet-copy"));
+    TabletCopyClientMetrics tablet_copy_client_metrics(metric_entity);
+
+    // Create a thread pool to copy tablets.
+    std::unique_ptr<ThreadPool> copy_pool;
+    ThreadPoolBuilder("tool-tablet-copy-pool")
+        .set_max_threads(FLAGS_num_threads)
+        .set_min_threads(FLAGS_num_threads)
+        .Build(&copy_pool);
+
+    shared_ptr<Messenger> messenger;
+    RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger));
+    // Start to copy tablets.
+    for (const auto& tablet_id : tablet_ids_to_copy_) {
+      RETURN_NOT_OK(copy_pool->Submit([&]() {
+        // 'fake_replica' is used for checking copy progress only.
+        scoped_refptr<TabletReplica> fake_replica(new TabletReplica());
+        {
+          std::lock_guard<simple_spinlock> l(lock);
+          LOG(WARNING) << "Start to copy tablet " << tablet_id;
+          InsertOrDie(&copying_replicas, fake_replica.get());
+        }
+        Status s;
+        unique_ptr<TabletCopyClient> client;
+        if (copy_type_ == CopyType::FROM_REMOTE) {
+          client.reset(new RemoteTabletCopyClient(tablet_id, dst_fs_manager_, dst_cmeta_manager_,
+                                                  messenger, &tablet_copy_client_metrics));
+          s = client->Start(source_addr_, nullptr);
+        } else {
+          CHECK_EQ(copy_type_, CopyType::FROM_LOCAL);
+          client.reset(new LocalTabletCopyClient(tablet_id, dst_fs_manager_,
+                                                 dst_cmeta_manager_, /* messenger */ nullptr,
+                                                 &tablet_copy_client_metrics, src_fs_manager_,
+                                                 /* tablet_copy_source_metrics */ nullptr));
+          s = client->Start(tablet_id, /* meta */ nullptr);
+        }
+        s = s.AndThen([&] {
+          return client->FetchAll(fake_replica);
+        }).AndThen([&] {
+          return client->Finish();
+        });
+        {
+          std::lock_guard<simple_spinlock> l(lock);
+          if (!s.ok()) {
+            InsertOrDie(&failed_tablet_ids, tablet_id);
+            LOG(ERROR) << Substitute("Tablet $0 copy failed: $1.", tablet_id, s.ToString());
+          } else {
+            succeed_tablet_count++;
+            LOG(INFO) << Substitute("Tablet $0 copy succeed.", tablet_id);
+          }
+          copying_replicas.erase(fake_replica.get());
+
+          LOG(INFO) << Substitute("$0/$1 tablets, $2 bytes copied, include $3 failed tablets.",
+                                  succeed_tablet_count + failed_tablet_ids.size(),
+                                  total_tablet_count,
+                                  tablet_copy_client_metrics.bytes_fetched->value(),
+                                  failed_tablet_ids.size());
+        }
+        return Status::OK();
+      }));
+    }
+
+    copy_pool->Wait();
+    copy_pool->Shutdown();
+    latch.CountDown();
+    check_thread->Join();
+
+    return Status::OK();
+  }
+
+ private:
+  enum CopyType {
+    FROM_LOCAL,
+    FROM_REMOTE,
+  };
+
+  set<string> tablet_ids_to_copy_;
+  FsManager* dst_fs_manager_;
+  scoped_refptr<consensus::ConsensusMetadataManager> dst_cmeta_manager_;
+  FsManager* src_fs_manager_;
+  const set<string> src_tablet_ids_set_;
+  const HostPort source_addr_;
+  CopyType copy_type_;
+};
+
 Status FsInit(bool skip_block_manager, unique_ptr<FsManager>* fs_manager) {
   FsManagerOpts fs_opts;
   fs_opts.read_only = true;
@@ -423,23 +579,21 @@ Status SetRaftTerm(const RunnerContext& context) {
 
 Status CopyFromRemote(const RunnerContext& context) {
   // Parse the tablet ID and source arguments.
-  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
-  const string& rpc_address = FindOrDie(context.required_args, "source");
+  const string& tablet_ids_str = FindOrDie(context.required_args, kTabletIdsCsvArg);
+  set<string> tablet_ids_to_copy = Split(tablet_ids_str, ",", strings::SkipWhitespace());
+  if (tablet_ids_to_copy.empty())
+    return Status::InvalidArgument("no tablet identifiers provided");
 
+  const string& rpc_address = FindOrDie(context.required_args, "source");
   HostPort hp;
   RETURN_NOT_OK(ParseHostPortString(rpc_address, &hp));
 
-  // Copy the tablet over.
   FsManager fs_manager(Env::Default(), FsManagerOpts());
   RETURN_NOT_OK(fs_manager.Open());
   scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
-  shared_ptr<Messenger> messenger;
-  RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger));
-  RemoteTabletCopyClient client(tablet_id, &fs_manager, cmeta_manager,
-                                messenger, nullptr /* no metrics */);
-  RETURN_NOT_OK(client.Start(hp, nullptr));
-  RETURN_NOT_OK(client.FetchAll(nullptr));
-  return client.Finish();
+
+  TabletCopier copier(move(tablet_ids_to_copy), &fs_manager, move(cmeta_manager), move(hp));
+  return copier.CopyTablets();
 }
 
 Status CopyFromLocal(const RunnerContext& context) {
@@ -472,104 +626,12 @@ Status CopyFromLocal(const RunnerContext& context) {
   RETURN_NOT_OK(src_fs_manager.ListTabletIds(&src_tablet_ids));
   set<string> src_tablet_ids_set(src_tablet_ids.begin(), src_tablet_ids.end());
 
-  // Prepare to check copy progress.
-  int total_tablet_count = tablet_ids_to_copy.size();
-  // 'lock' is used for protecting 'copying_replicas', 'failed_tablet_ids'
-  // and 'succeed_tablet_count'.
-  simple_spinlock lock;
-  set<TabletReplica*> copying_replicas;
-  set<string> failed_tablet_ids;
-  int succeed_tablet_count = 0;
-  for (auto tablet_id = tablet_ids_to_copy.begin();
-       tablet_id != tablet_ids_to_copy.end();) {
-    if (!ContainsKey(src_tablet_ids_set, *tablet_id)) {
-      LOG(ERROR) << Substitute("Tablet $0 copy failed: not found in source filesystem.",
-                               *tablet_id);
-      InsertOrDie(&failed_tablet_ids, *tablet_id);
-      tablet_id = tablet_ids_to_copy.erase(tablet_id);
-    } else {
-      tablet_id++;
-    }
-  }
-
-  // Create a thread to obtain copy process periodically.
-  CountDownLatch latch(1);
-  scoped_refptr<Thread> check_thread;
-  RETURN_NOT_OK(Thread::Create("tool-tablet-copy", "check-progress",
-      [&] () {
-        while (!latch.WaitFor(MonoDelta::FromSeconds(10))) {
-          std::lock_guard<simple_spinlock> l(lock);
-          for (const auto& copying_replica : copying_replicas) {
-            LOG(INFO) << Substitute("Tablet $0 copy status: $1",
-                                    copying_replica->tablet_id(),
-                                    copying_replica->last_status());
-          }
-        }
-      }, &check_thread));
-
-  // Init TabletCopyClientMetrics.
-  MetricRegistry metric_registry;
-  scoped_refptr<MetricEntity> metric_entity(
-      METRIC_ENTITY_server.Instantiate(&metric_registry, "tool-tablet-copy"));
-  TabletCopyClientMetrics tablet_copy_client_metrics(metric_entity);
-
-  // Create a thread pool to copy tablets.
-  std::unique_ptr<ThreadPool> copy_pool;
-  ThreadPoolBuilder("tool-tablet-copy-pool")
-      .set_max_threads(FLAGS_num_threads)
-      .set_min_threads(FLAGS_num_threads)
-      .Build(&copy_pool);
-
-  // Start to copy tablets.
-  for (const auto& tablet_id : tablet_ids_to_copy) {
-    RETURN_NOT_OK(copy_pool->Submit([&]() {
-      // 'fake_replica' is used for checking copy progress only.
-      scoped_refptr<TabletReplica> fake_replica(new TabletReplica());
-      {
-        std::lock_guard<simple_spinlock> l(lock);
-        LOG(WARNING) << "Start to copy tablet " << tablet_id;
-        InsertOrDie(&copying_replicas, fake_replica.get());
-      }
-
-      LocalTabletCopyClient client(tablet_id,
-                                   &dst_fs_manager,
-                                   dst_cmeta_manager,
-                                   /* messenger */ nullptr,
-                                   &tablet_copy_client_metrics,
-                                   &src_fs_manager,
-                                   /* tablet_copy_source_metrics */ nullptr);
-      Status s = client.Start(tablet_id, /* meta */ nullptr).AndThen([&] {
-        return client.FetchAll(fake_replica);
-      }).AndThen([&] {
-        return client.Finish();
-      });
-
-      {
-        std::lock_guard<simple_spinlock> l(lock);
-        if (!s.ok()) {
-          InsertOrDie(&failed_tablet_ids, tablet_id);
-          LOG(ERROR) << Substitute("Tablet $0 copy failed: $1.", tablet_id, s.ToString());
-        } else {
-          succeed_tablet_count++;
-          LOG(INFO) << Substitute("Tablet $0 copy succeed.", tablet_id);
-        }
-        copying_replicas.erase(fake_replica.get());
-
-        LOG(INFO) << Substitute("$0/$1 tablets, $2 bytes copied, include $3 failed tablets.",
-                                succeed_tablet_count + failed_tablet_ids.size(),
-                                total_tablet_count,
-                                tablet_copy_client_metrics.bytes_fetched->value(),
-                                failed_tablet_ids.size());
-      }
-    }));
-  }
-
-  copy_pool->Wait();
-  copy_pool->Shutdown();
-  latch.CountDown();
-  check_thread->Join();
-
-  return Status::OK();
+  TabletCopier copier(move(tablet_ids_to_copy),
+                      &dst_fs_manager,
+                      move(dst_cmeta_manager),
+                      &src_fs_manager,
+                      move(src_tablet_ids_set));
+  return copier.CopyTablets();
 }
 
 Status DeleteLocalReplica(const string& tablet_id,
@@ -1109,14 +1171,15 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
 
   unique_ptr<Action> copy_from_remote =
       ActionBuilder("copy_from_remote", &CopyFromRemote)
-      .Description("Copy a tablet replica from a remote server")
-      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .Description("Copy tablet replicas from a remote server")
+      .AddRequiredParameter({ kTabletIdsCsvArg, kTabletIdsCsvArgDesc })
       .AddRequiredParameter({ "source", "Source RPC address of "
                               "form hostname:port" })
       .AddOptionalParameter("fs_data_dirs")
       .AddOptionalParameter("fs_metadata_dir")
       .AddOptionalParameter("fs_wal_dir")
       .AddOptionalParameter("tablet_copy_download_threads_nums_per_session")
+      .AddOptionalParameter("num_threads")
       .Build();
 
   unique_ptr<Action> copy_from_local =