You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/08/02 02:40:08 UTC
[kudu] branch master updated: [Tools] Copy tablets from a remote server in batch
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 17528c7a8 [Tools] Copy tablets from a remote server in batch
17528c7a8 is described below
commit 17528c7a8ab1e44a421d535346e849cfaf37d3d1
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Thu Jul 14 18:40:50 2022 +0800
[Tools] Copy tablets from a remote server in batch
The command: kudu local_replica copy_from_remote only supports to copy one
tablet from a remote server once a time. It is not efficient when we need
to copy all tablets from a remote server.
Therefore this patch supports to copy tablets from a remote server in batch.
Change-Id: Ib598142883b8ab958625a4f04648d58ea95f3664
Reviewed-on: http://gerrit.cloudera.org:8080/18732
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <ac...@gmail.com>
---
src/kudu/tablet/tablet_replica.h | 5 +-
src/kudu/tools/kudu-tool-test.cc | 39 +++-
src/kudu/tools/tool_action_local_replica.cc | 283 +++++++++++++++++-----------
3 files changed, 212 insertions(+), 115 deletions(-)
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index ead1fb830..099111766 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -83,8 +83,7 @@ class ResultTracker;
} // namespace rpc
namespace tools {
-struct RunnerContext;
-Status CopyFromLocal(const RunnerContext& context);
+class TabletCopier;
} // namespace tools
namespace tablet {
@@ -395,7 +394,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
void BeginTxnParticipantOp(int64_t txn_id, RegisteredTxnCallback began_txn_cb);
private:
- friend Status kudu::tools::CopyFromLocal(const kudu::tools::RunnerContext& context);
+ friend class kudu::tools::TabletCopier;
friend class kudu::AlterTableTest;
friend class RefCountedThreadSafe<TabletReplica>;
friend class TabletReplicaTest;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 697edc4a3..e568eecff 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -213,6 +213,7 @@ using kudu::tserver::TabletServerErrorPB;
using kudu::tserver::WriteRequestPB;
using std::back_inserter;
using std::copy;
+using std::find;
using std::make_pair;
using std::map;
using std::max;
@@ -1237,7 +1238,7 @@ TEST_F(ToolTest, TestModeHelp) {
"cmeta.*Operate on a local tablet replica's consensus",
"data_size.*Summarize the data size",
"dump.*Dump a Kudu filesystem",
- "copy_from_remote.*Copy a tablet replica from a remote server",
+ "copy_from_remote.*Copy tablet replicas from a remote server",
"copy_from_local.*Copy tablet replicas from local filesystem",
"delete.*Delete tablet replicas from the local filesystem",
"list.*Show list of tablet replicas",
@@ -1266,7 +1267,7 @@ TEST_F(ToolTest, TestModeHelp) {
}
{
const vector<string> kLocalReplicaCopyFromRemoteRegexes = {
- "Copy a tablet replica from a remote server",
+ "Copy tablet replicas from a remote server",
};
NO_FATALS(RunTestHelp("local_replica copy_from_remote --help",
kLocalReplicaCopyFromRemoteRegexes));
@@ -8179,6 +8180,40 @@ TEST_F(ToolTest, TestLocalReplicaCopyLocal) {
ASSERT_EQ(src_stdout, dst_stdout);
}
+TEST_F(ToolTest, TestLocalReplicaCopyRemote) {
+ InternalMiniClusterOptions opts;
+ opts.num_tablet_servers = 2;
+ NO_FATALS(StartMiniCluster(std::move(opts)));
+ NO_FATALS(CreateTableWithFlushedData("table1", mini_cluster_.get(), 3, 1));
+ NO_FATALS(CreateTableWithFlushedData("table2", mini_cluster_.get(), 3, 1));
+ int source_tserver_tablet_count = mini_cluster_->mini_tablet_server(0)->ListTablets().size();
+ int target_tserver_tablet_count_before = mini_cluster_->mini_tablet_server(1)
+ ->ListTablets().size();
+ string tablet_ids_str = JoinStrings(mini_cluster_->mini_tablet_server(0)->ListTablets(), ",");
+ string source_tserver_rpc_addr = mini_cluster_->mini_tablet_server(0)
+ ->bound_rpc_addr().ToString();
+ string wal_dir = mini_cluster_->mini_tablet_server(1)->options()->fs_opts.wal_root;
+ string data_dirs = JoinStrings(mini_cluster_->mini_tablet_server(1)
+ ->options()->fs_opts.data_roots, ",");
+ NO_FATALS(mini_cluster_->mini_tablet_server(1)->Shutdown());
+ // Copy tablet replicas from tserver0 to tserver1.
+ NO_FATALS(RunActionStdoutNone(
+ Substitute("local_replica copy_from_remote $0 $1 "
+ "-fs_data_dirs=$2 -fs_wal_dir=$3 -num_threads=3",
+ tablet_ids_str,
+ source_tserver_rpc_addr,
+ data_dirs,
+ wal_dir)));
+ NO_FATALS(mini_cluster_->mini_tablet_server(1)->Start());
+ const vector<string>& target_tablet_ids = mini_cluster_->mini_tablet_server(1)->ListTablets();
+ ASSERT_EQ(source_tserver_tablet_count + target_tserver_tablet_count_before,
+ target_tablet_ids.size());
+ for (string tablet_id : mini_cluster_->mini_tablet_server(0)->ListTablets()) {
+ ASSERT_TRUE(find(target_tablet_ids.begin(), target_tablet_ids.end(), tablet_id)
+ != target_tablet_ids.end());
+ }
+}
+
TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
// Local copies are not supported on encrypted severs at this time.
if (FLAGS_encrypt_data_at_rest) {
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 82cc0cfff..afe405837 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -168,11 +168,13 @@ using kudu::tablet::TabletMetadata;
using kudu::tablet::TabletReplica;
using kudu::tserver::LocalTabletCopyClient;
using kudu::tserver::RemoteTabletCopyClient;
+using kudu::tserver::TabletCopyClient;
using kudu::tserver::TabletCopyClientMetrics;
using kudu::tserver::TSTabletManager;
using std::cout;
using std::endl;
using std::map;
+using std::move;
using std::pair;
using std::shared_ptr;
using std::set;
@@ -206,6 +208,160 @@ string Indent(int indent) {
}
} // anonymous namespace
+class TabletCopier {
+ public:
+ TabletCopier(set<string> tablet_ids_to_copy,
+ FsManager* dst_fs_manager,
+ scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager,
+ HostPort source_addr) :
+ tablet_ids_to_copy_(move(tablet_ids_to_copy)),
+ dst_fs_manager_(dst_fs_manager),
+ dst_cmeta_manager_(move(dst_cmeta_manager)),
+ source_addr_(move(source_addr)),
+ copy_type_(CopyType::FROM_REMOTE) {
+ }
+
+ TabletCopier(set<string> tablet_ids_to_copy,
+ FsManager* dst_fs_manager,
+ scoped_refptr<ConsensusMetadataManager> dst_cmeta_manager,
+ FsManager* src_fs_manager,
+ set<string> src_tablet_ids_set) :
+ tablet_ids_to_copy_(move(tablet_ids_to_copy)),
+ dst_fs_manager_(dst_fs_manager),
+ dst_cmeta_manager_(move(dst_cmeta_manager)),
+ src_fs_manager_(move(src_fs_manager)),
+ src_tablet_ids_set_(move(src_tablet_ids_set)),
+ copy_type_(CopyType::FROM_LOCAL) {
+ }
+
+ ~TabletCopier() = default;
+
+ Status CopyTablets() {
+ // Prepare to check copy progress.
+ int total_tablet_count = tablet_ids_to_copy_.size();
+ // 'lock' is used for protecting 'copying_replicas', 'failed_tablet_ids'
+ // and 'succeed_tablet_count'.
+ simple_spinlock lock;
+ set<TabletReplica*> copying_replicas;
+ set<string> failed_tablet_ids;
+ int succeed_tablet_count = 0;
+ if (copy_type_ == CopyType::FROM_LOCAL) {
+ for (auto tablet_id = tablet_ids_to_copy_.begin();
+ tablet_id != tablet_ids_to_copy_.end();) {
+ if (!ContainsKey(src_tablet_ids_set_, *tablet_id)) {
+ LOG(ERROR) << Substitute("Tablet $0 copy failed: not found in source filesystem.",
+ *tablet_id);
+ InsertOrDie(&failed_tablet_ids, *tablet_id);
+ tablet_id = tablet_ids_to_copy_.erase(tablet_id);
+ } else {
+ tablet_id++;
+ }
+ }
+ }
+
+ // Create a thread to obtain copy process periodically.
+ CountDownLatch latch(1);
+ scoped_refptr<Thread> check_thread;
+ RETURN_NOT_OK(Thread::Create("tool-tablet-copy", "check-progress",
+ [&] () {
+ while (!latch.WaitFor(MonoDelta::FromSeconds(10))) {
+ std::lock_guard<simple_spinlock> l(lock);
+ for (const auto& copying_replica : copying_replicas) {
+ LOG(INFO) << Substitute("Tablet $0 copy status: $1",
+ copying_replica->tablet_id(),
+ copying_replica->last_status());
+ }
+ }
+ }, &check_thread));
+
+ // Init TabletCopyClientMetrics.
+ MetricRegistry metric_registry;
+ scoped_refptr<MetricEntity> metric_entity(
+ METRIC_ENTITY_server.Instantiate(&metric_registry, "tool-tablet-copy"));
+ TabletCopyClientMetrics tablet_copy_client_metrics(metric_entity);
+
+ // Create a thread pool to copy tablets.
+ std::unique_ptr<ThreadPool> copy_pool;
+ ThreadPoolBuilder("tool-tablet-copy-pool")
+ .set_max_threads(FLAGS_num_threads)
+ .set_min_threads(FLAGS_num_threads)
+ .Build(©_pool);
+
+ shared_ptr<Messenger> messenger;
+ RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger));
+ // Start to copy tablets.
+ for (const auto& tablet_id : tablet_ids_to_copy_) {
+ RETURN_NOT_OK(copy_pool->Submit([&]() {
+ // 'fake_replica' is used for checking copy progress only.
+ scoped_refptr<TabletReplica> fake_replica(new TabletReplica());
+ {
+ std::lock_guard<simple_spinlock> l(lock);
+ LOG(WARNING) << "Start to copy tablet " << tablet_id;
+ InsertOrDie(©ing_replicas, fake_replica.get());
+ }
+ Status s;
+ unique_ptr<TabletCopyClient> client;
+ if (copy_type_ == CopyType::FROM_REMOTE) {
+ client.reset(new RemoteTabletCopyClient(tablet_id, dst_fs_manager_, dst_cmeta_manager_,
+ messenger, &tablet_copy_client_metrics));
+ s = client->Start(source_addr_, nullptr);
+ } else {
+ CHECK_EQ(copy_type_, CopyType::FROM_LOCAL);
+ client.reset(new LocalTabletCopyClient(tablet_id, dst_fs_manager_,
+ dst_cmeta_manager_, /* messenger */ nullptr,
+ &tablet_copy_client_metrics, src_fs_manager_,
+ /* tablet_copy_source_metrics */ nullptr));
+ s = client->Start(tablet_id, /* meta */ nullptr);
+ }
+ s = s.AndThen([&] {
+ return client->FetchAll(fake_replica);
+ }).AndThen([&] {
+ return client->Finish();
+ });
+ {
+ std::lock_guard<simple_spinlock> l(lock);
+ if (!s.ok()) {
+ InsertOrDie(&failed_tablet_ids, tablet_id);
+ LOG(ERROR) << Substitute("Tablet $0 copy failed: $1.", tablet_id, s.ToString());
+ } else {
+ succeed_tablet_count++;
+ LOG(INFO) << Substitute("Tablet $0 copy succeed.", tablet_id);
+ }
+ copying_replicas.erase(fake_replica.get());
+
+ LOG(INFO) << Substitute("$0/$1 tablets, $2 bytes copied, include $3 failed tablets.",
+ succeed_tablet_count + failed_tablet_ids.size(),
+ total_tablet_count,
+ tablet_copy_client_metrics.bytes_fetched->value(),
+ failed_tablet_ids.size());
+ }
+ return Status::OK();
+ }));
+ }
+
+ copy_pool->Wait();
+ copy_pool->Shutdown();
+ latch.CountDown();
+ check_thread->Join();
+
+ return Status::OK();
+ }
+
+ private:
+ enum CopyType {
+ FROM_LOCAL,
+ FROM_REMOTE,
+ };
+
+ set<string> tablet_ids_to_copy_;
+ FsManager* dst_fs_manager_;
+ scoped_refptr<consensus::ConsensusMetadataManager> dst_cmeta_manager_;
+ FsManager* src_fs_manager_;
+ const set<string> src_tablet_ids_set_;
+ const HostPort source_addr_;
+ CopyType copy_type_;
+};
+
Status FsInit(bool skip_block_manager, unique_ptr<FsManager>* fs_manager) {
FsManagerOpts fs_opts;
fs_opts.read_only = true;
@@ -423,23 +579,21 @@ Status SetRaftTerm(const RunnerContext& context) {
Status CopyFromRemote(const RunnerContext& context) {
// Parse the tablet ID and source arguments.
- const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
- const string& rpc_address = FindOrDie(context.required_args, "source");
+ const string& tablet_ids_str = FindOrDie(context.required_args, kTabletIdsCsvArg);
+ set<string> tablet_ids_to_copy = Split(tablet_ids_str, ",", strings::SkipWhitespace());
+ if (tablet_ids_to_copy.empty())
+ return Status::InvalidArgument("no tablet identifiers provided");
+ const string& rpc_address = FindOrDie(context.required_args, "source");
HostPort hp;
RETURN_NOT_OK(ParseHostPortString(rpc_address, &hp));
- // Copy the tablet over.
FsManager fs_manager(Env::Default(), FsManagerOpts());
RETURN_NOT_OK(fs_manager.Open());
scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
- shared_ptr<Messenger> messenger;
- RETURN_NOT_OK(BuildMessenger("tablet_copy_client", &messenger));
- RemoteTabletCopyClient client(tablet_id, &fs_manager, cmeta_manager,
- messenger, nullptr /* no metrics */);
- RETURN_NOT_OK(client.Start(hp, nullptr));
- RETURN_NOT_OK(client.FetchAll(nullptr));
- return client.Finish();
+
+ TabletCopier copier(move(tablet_ids_to_copy), &fs_manager, move(cmeta_manager), move(hp));
+ return copier.CopyTablets();
}
Status CopyFromLocal(const RunnerContext& context) {
@@ -472,104 +626,12 @@ Status CopyFromLocal(const RunnerContext& context) {
RETURN_NOT_OK(src_fs_manager.ListTabletIds(&src_tablet_ids));
set<string> src_tablet_ids_set(src_tablet_ids.begin(), src_tablet_ids.end());
- // Prepare to check copy progress.
- int total_tablet_count = tablet_ids_to_copy.size();
- // 'lock' is used for protecting 'copying_replicas', 'failed_tablet_ids'
- // and 'succeed_tablet_count'.
- simple_spinlock lock;
- set<TabletReplica*> copying_replicas;
- set<string> failed_tablet_ids;
- int succeed_tablet_count = 0;
- for (auto tablet_id = tablet_ids_to_copy.begin();
- tablet_id != tablet_ids_to_copy.end();) {
- if (!ContainsKey(src_tablet_ids_set, *tablet_id)) {
- LOG(ERROR) << Substitute("Tablet $0 copy failed: not found in source filesystem.",
- *tablet_id);
- InsertOrDie(&failed_tablet_ids, *tablet_id);
- tablet_id = tablet_ids_to_copy.erase(tablet_id);
- } else {
- tablet_id++;
- }
- }
-
- // Create a thread to obtain copy process periodically.
- CountDownLatch latch(1);
- scoped_refptr<Thread> check_thread;
- RETURN_NOT_OK(Thread::Create("tool-tablet-copy", "check-progress",
- [&] () {
- while (!latch.WaitFor(MonoDelta::FromSeconds(10))) {
- std::lock_guard<simple_spinlock> l(lock);
- for (const auto& copying_replica : copying_replicas) {
- LOG(INFO) << Substitute("Tablet $0 copy status: $1",
- copying_replica->tablet_id(),
- copying_replica->last_status());
- }
- }
- }, &check_thread));
-
- // Init TabletCopyClientMetrics.
- MetricRegistry metric_registry;
- scoped_refptr<MetricEntity> metric_entity(
- METRIC_ENTITY_server.Instantiate(&metric_registry, "tool-tablet-copy"));
- TabletCopyClientMetrics tablet_copy_client_metrics(metric_entity);
-
- // Create a thread pool to copy tablets.
- std::unique_ptr<ThreadPool> copy_pool;
- ThreadPoolBuilder("tool-tablet-copy-pool")
- .set_max_threads(FLAGS_num_threads)
- .set_min_threads(FLAGS_num_threads)
- .Build(©_pool);
-
- // Start to copy tablets.
- for (const auto& tablet_id : tablet_ids_to_copy) {
- RETURN_NOT_OK(copy_pool->Submit([&]() {
- // 'fake_replica' is used for checking copy progress only.
- scoped_refptr<TabletReplica> fake_replica(new TabletReplica());
- {
- std::lock_guard<simple_spinlock> l(lock);
- LOG(WARNING) << "Start to copy tablet " << tablet_id;
- InsertOrDie(©ing_replicas, fake_replica.get());
- }
-
- LocalTabletCopyClient client(tablet_id,
- &dst_fs_manager,
- dst_cmeta_manager,
- /* messenger */ nullptr,
- &tablet_copy_client_metrics,
- &src_fs_manager,
- /* tablet_copy_source_metrics */ nullptr);
- Status s = client.Start(tablet_id, /* meta */ nullptr).AndThen([&] {
- return client.FetchAll(fake_replica);
- }).AndThen([&] {
- return client.Finish();
- });
-
- {
- std::lock_guard<simple_spinlock> l(lock);
- if (!s.ok()) {
- InsertOrDie(&failed_tablet_ids, tablet_id);
- LOG(ERROR) << Substitute("Tablet $0 copy failed: $1.", tablet_id, s.ToString());
- } else {
- succeed_tablet_count++;
- LOG(INFO) << Substitute("Tablet $0 copy succeed.", tablet_id);
- }
- copying_replicas.erase(fake_replica.get());
-
- LOG(INFO) << Substitute("$0/$1 tablets, $2 bytes copied, include $3 failed tablets.",
- succeed_tablet_count + failed_tablet_ids.size(),
- total_tablet_count,
- tablet_copy_client_metrics.bytes_fetched->value(),
- failed_tablet_ids.size());
- }
- }));
- }
-
- copy_pool->Wait();
- copy_pool->Shutdown();
- latch.CountDown();
- check_thread->Join();
-
- return Status::OK();
+ TabletCopier copier(move(tablet_ids_to_copy),
+ &dst_fs_manager,
+ move(dst_cmeta_manager),
+ &src_fs_manager,
+ move(src_tablet_ids_set));
+ return copier.CopyTablets();
}
Status DeleteLocalReplica(const string& tablet_id,
@@ -1109,14 +1171,15 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
unique_ptr<Action> copy_from_remote =
ActionBuilder("copy_from_remote", &CopyFromRemote)
- .Description("Copy a tablet replica from a remote server")
- .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+ .Description("Copy tablet replicas from a remote server")
+ .AddRequiredParameter({ kTabletIdsCsvArg, kTabletIdsCsvArgDesc })
.AddRequiredParameter({ "source", "Source RPC address of "
"form hostname:port" })
.AddOptionalParameter("fs_data_dirs")
.AddOptionalParameter("fs_metadata_dir")
.AddOptionalParameter("fs_wal_dir")
.AddOptionalParameter("tablet_copy_download_threads_nums_per_session")
+ .AddOptionalParameter("num_threads")
.Build();
unique_ptr<Action> copy_from_local =