You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2021/08/27 06:56:16 UTC

[kudu] 02/02: [tool] Support to rewrite and print cmeta by batch

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 47450aa5b637a8f273636b0d8d6496e82a0dc4fe
Author: Yingchun Lai <la...@sensorsdata.cn>
AuthorDate: Sat Aug 21 15:55:11 2021 +0800

    [tool] Support to rewrite and print cmeta by batch
    
    Both print and rewrite cmeta is operate on a single tablet, it will
    cost a very long time if there are thousands of block containers
    to rewrite. In some cases we need to rewrite a batch of raft configs
    (e.g. 2 tservers permanently failed in a 3 tservers cluster).
    This patch adds support to open FsManager once and print or rewrite
    cmeta by batch.
    
    Change-Id: I33dcbf5704bce4265eae0bed0f4296d17e352780
    Reviewed-on: http://gerrit.cloudera.org:8080/17804
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../integration-tests/master_failover-itest.cc     |  7 +-
 src/kudu/tools/kudu-tool-test.cc                   | 89 +++++++++++++++------
 src/kudu/tools/tool_action_local_replica.cc        | 92 +++++++++++++---------
 3 files changed, 128 insertions(+), 60 deletions(-)

diff --git a/src/kudu/integration-tests/master_failover-itest.cc b/src/kudu/integration-tests/master_failover-itest.cc
index 1a46fe3..95ea9a2 100644
--- a/src/kudu/integration-tests/master_failover-itest.cc
+++ b/src/kudu/integration-tests/master_failover-itest.cc
@@ -62,6 +62,7 @@ using std::set;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::SkipWhitespace;
 using strings::Split;
 using strings::Substitute;
 
@@ -400,8 +401,10 @@ TEST_P(MasterFailoverTest, TestMasterPermanentFailure) {
       string output;
       ASSERT_OK(Subprocess::Call(args, "", &output));
       StripWhiteSpace(&output);
-      LOG(INFO) << "UUIDS: " << output;
-      set<string> uuids = Split(output, " ");
+      LOG(INFO) << output;
+      vector<string> sections = Split(output, "peers: ", SkipWhitespace());
+      ASSERT_EQ(2, sections.size());
+      set<string> uuids = Split(sections[1], " ");
 
       // Isolate the failed master's UUID by eliminating the UUIDs of the
       // healthy masters from the set.
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index a9b3f18..acf978e 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3417,49 +3417,71 @@ TEST_F(ToolTest, TestLocalReplicaTombstoneDelete) {
 
 // Test for 'local_replica cmeta' functionality.
 TEST_F(ToolTest, TestLocalReplicaCMetaOps) {
-  NO_FATALS(StartMiniCluster());
+  const int kNumTablets = 4;
+  const int kNumTabletServers = 3;
+
+  InternalMiniClusterOptions opts;
+  opts.num_tablet_servers = kNumTabletServers;
+  NO_FATALS(StartMiniCluster(std::move(opts)));
 
   // TestWorkLoad.Setup() internally generates a table.
   TestWorkload workload(mini_cluster_.get());
-  workload.set_num_replicas(1);
+  workload.set_num_tablets(kNumTablets);
+  workload.set_num_replicas(3);
   workload.Setup();
-  MiniTabletServer* ts = mini_cluster_->mini_tablet_server(0);
-  const string ts_uuid = ts->uuid();
-  const string& flags = Substitute("-fs-wal-dir $0", ts->options()->fs_opts.wal_root);
-  string tablet_id;
+
+  vector<string> ts_uuids;
+  for (int i = 0; i < kNumTabletServers; ++i) {
+    ts_uuids.emplace_back(mini_cluster_->mini_tablet_server(i)->uuid());
+  }
+  const string& flags =
+      Substitute("-fs-wal-dir $0",
+                 mini_cluster_->mini_tablet_server(0)->options()->fs_opts.wal_root);
+  vector<string> tablet_ids;
   {
-    vector<string> tablets;
-    NO_FATALS(RunActionStdoutLines(Substitute("local_replica list $0", flags), &tablets));
-    ASSERT_EQ(1, tablets.size());
-    tablet_id = tablets[0];
+    NO_FATALS(RunActionStdoutLines(Substitute("local_replica list $0", flags), &tablet_ids));
+    ASSERT_EQ(kNumTablets, tablet_ids.size());
+  }
+  vector<string> cmeta_paths;
+  for (const auto& tablet_id : tablet_ids) {
+    cmeta_paths.emplace_back(mini_cluster_->mini_tablet_server(0)->server()->
+        fs_manager()->GetConsensusMetadataPath(tablet_id));
+  }
+  const string& ts_host_port = mini_cluster_->mini_tablet_server(0)->bound_rpc_addr().ToString();
+  for (int i = 0; i < kNumTabletServers; ++i) {
+    mini_cluster_->mini_tablet_server(i)->Shutdown();
   }
-  const auto& cmeta_path = ts->server()->fs_manager()->GetConsensusMetadataPath(tablet_id);
-
-  ts->Shutdown();
 
   // Test print_replica_uuids.
-  // We only have a single replica, so we expect one line, with our server's UUID.
+  // We have kNumTablets replicas, so we expect kNumTablets lines, with our 3 servers' UUIDs.
   {
-    vector<string> uuids;
+    vector<string> lines;
     NO_FATALS(RunActionStdoutLines(Substitute("local_replica cmeta print_replica_uuids $0 $1",
-                                               flags, tablet_id), &uuids));
-    ASSERT_EQ(1, uuids.size());
-    EXPECT_EQ(ts_uuid, uuids[0]);
+                                              flags, JoinStrings(tablet_ids, ",")), &lines));
+    ASSERT_EQ(kNumTablets, lines.size());
+    for (int i = 0; i < lines.size(); ++i) {
+      ASSERT_STR_MATCHES(lines[i],
+                         Substitute("tablet: $0, peers:( $1| $2| $3){3}",
+                                    tablet_ids[i], ts_uuids[0], ts_uuids[1], ts_uuids[2]));
+      ASSERT_STR_CONTAINS(lines[i], ts_uuids[0]);
+      ASSERT_STR_CONTAINS(lines[i], ts_uuids[1]);
+      ASSERT_STR_CONTAINS(lines[i], ts_uuids[2]);
+    }
   }
 
   // Test using set-term to bump the term to 123.
-  {
+  for (int i = 0; i < tablet_ids.size(); ++i) {
     NO_FATALS(RunActionStdoutNone(Substitute("local_replica cmeta set-term $0 $1 123",
-                                             flags, tablet_id)));
+                                             flags, tablet_ids[i])));
 
     string stdout;
-    NO_FATALS(RunActionStdoutString(Substitute("pbc dump $0", cmeta_path),
+    NO_FATALS(RunActionStdoutString(Substitute("pbc dump $0", cmeta_paths[i]),
                                     &stdout));
     ASSERT_STR_CONTAINS(stdout, "current_term: 123");
   }
 
   // Test that set-term refuses to decrease the term.
-  {
+  for (const auto& tablet_id : tablet_ids) {
     string stdout, stderr;
     Status s = RunTool(Substitute("local_replica cmeta set-term $0 $1 10",
                                   flags, tablet_id),
@@ -3471,6 +3493,29 @@ TEST_F(ToolTest, TestLocalReplicaCMetaOps) {
     EXPECT_THAT(stderr, testing::HasSubstr(
         "specified term 10 must be higher than current term 123"));
   }
+
+  // Test using rewrite_raft_config to set all tablets' raft config with only 1 member.
+  {
+    NO_FATALS(RunActionStdoutNone(
+        Substitute("local_replica cmeta rewrite_raft_config $0 $1 $2:$3",
+                   flags,
+                   JoinStrings(tablet_ids, ","),
+                   ts_uuids[0],
+                   ts_host_port)));
+  }
+
+  // We have kNumTablets replicas, so we expect kNumTablets lines, with our the
+  // first tservers' UUIDs.
+  {
+    vector<string> lines;
+    NO_FATALS(RunActionStdoutLines(Substitute("local_replica cmeta print_replica_uuids $0 $1",
+                                              flags, JoinStrings(tablet_ids, ",")), &lines));
+    ASSERT_EQ(kNumTablets, lines.size());
+    for (int i = 0; i < lines.size(); ++i) {
+      ASSERT_STR_MATCHES(lines[i], Substitute("tablet: $0, peers: $1",
+                                              tablet_ids[i], ts_uuids[0]));
+    }
+  }
 }
 
 TEST_F(ToolTest, TestTserverList) {
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 2c99e04..0540abb 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -23,7 +23,6 @@
 #include <map>
 #include <memory>
 #include <string>
-#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -253,19 +252,27 @@ Status ParsePeerString(const string& peer_str,
 }
 
 Status PrintReplicaUuids(const RunnerContext& context) {
+  const string& tablet_ids_str = FindOrDie(context.required_args, kTabletIdsCsvArg);
+  vector<string> tablet_ids = strings::Split(tablet_ids_str, ",", strings::SkipEmpty());
+  if (tablet_ids.empty()) {
+    return Status::InvalidArgument("no tablet identifiers provided");
+  }
+
   unique_ptr<FsManager> fs_manager;
   RETURN_NOT_OK(FsInit(/*skip_block_manager*/true, &fs_manager));
   scoped_refptr<ConsensusMetadataManager> cmeta_manager(
       new ConsensusMetadataManager(fs_manager.get()));
 
-  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
-
-  // Load the cmeta file and print all peer uuids.
-  scoped_refptr<ConsensusMetadata> cmeta;
-  RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
-  cout << JoinMapped(cmeta->CommittedConfig().peers(),
-                     [](const RaftPeerPB& p){ return p.permanent_uuid(); },
-                     " ") << endl;
+  for (const auto& tablet_id : tablet_ids) {
+    // Load the cmeta file and print all peer uuids.
+    scoped_refptr<ConsensusMetadata> cmeta;
+    RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
+    cout << "tablet: " << tablet_id << ", peers: "
+         << JoinMapped(cmeta->CommittedConfig().peers(),
+                       [](const RaftPeerPB& p) { return p.permanent_uuid(); },
+                       " ")
+         << endl;
+  }
   return Status::OK();
 }
 
@@ -283,9 +290,17 @@ Status BackupConsensusMetadata(FsManager* fs_manager,
 }
 
 Status RewriteRaftConfig(const RunnerContext& context) {
-  // Parse tablet ID argument.
-  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
-  if (tablet_id != master::SysCatalogTable::kSysCatalogTabletId) {
+  const string& tablet_ids_str = FindOrDie(context.required_args, kTabletIdsCsvArg);
+  vector<string> tablet_ids = strings::Split(tablet_ids_str, ",", strings::SkipEmpty());
+  if (tablet_ids.empty()) {
+    return Status::InvalidArgument("no tablet identifiers provided");
+  }
+
+  const auto& found = find_if_not(tablet_ids.begin(), tablet_ids.end(),
+                                  [&] (const string& value) {
+                                    return value == master::SysCatalogTable::kSysCatalogTabletId;
+                                  });
+  if (found != tablet_ids.end()) {
     LOG(WARNING) << "Master will not notice rewritten Raft config of regular "
                  << "tablets. A regular Raft config change must occur.";
   }
@@ -294,37 +309,42 @@ Status RewriteRaftConfig(const RunnerContext& context) {
   vector<pair<string, HostPort>> peers;
   for (const auto& arg : context.variadic_args) {
     pair<string, HostPort> parsed_peer;
-    RETURN_NOT_OK(ParsePeerString(arg,
-                                  &parsed_peer.first, &parsed_peer.second));
+    RETURN_NOT_OK(ParsePeerString(arg, &parsed_peer.first, &parsed_peer.second));
     peers.push_back(parsed_peer);
   }
   DCHECK(!peers.empty());
 
-  // Make a copy of the old file before rewriting it.
   Env* env = Env::Default();
   FsManagerOpts fs_opts = FsManagerOpts();
   fs_opts.skip_block_manager = true;
   FsManager fs_manager(env, std::move(fs_opts));
   RETURN_NOT_OK(fs_manager.Open());
-  RETURN_NOT_OK(BackupConsensusMetadata(&fs_manager, tablet_id));
-
-  // Load the cmeta file and rewrite the raft config.
-  scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
-  scoped_refptr<ConsensusMetadata> cmeta;
-  RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
-  RaftConfigPB current_config = cmeta->CommittedConfig();
-  RaftConfigPB new_config = current_config;
-  new_config.clear_peers();
-  for (const auto& p : peers) {
-    RaftPeerPB new_peer;
-    new_peer.set_member_type(RaftPeerPB::VOTER);
-    new_peer.set_permanent_uuid(p.first);
-    HostPortPB new_peer_host_port_pb = HostPortToPB(p.second);
-    new_peer.mutable_last_known_addr()->CopyFrom(new_peer_host_port_pb);
-    new_config.add_peers()->CopyFrom(new_peer);
-  }
-  cmeta->set_committed_config(new_config);
-  return cmeta->Flush();
+  for (const auto& tablet_id : tablet_ids) {
+    LOG(INFO) << Substitute("Rewriting Raft config of tablet: $0", tablet_id);
+
+    // Make a copy of the old file before rewriting it.
+    RETURN_NOT_OK(BackupConsensusMetadata(&fs_manager, tablet_id));
+
+    // Load the cmeta file and rewrite the raft config.
+    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+        new ConsensusMetadataManager(&fs_manager));
+    scoped_refptr<ConsensusMetadata> cmeta;
+    RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
+    RaftConfigPB current_config = cmeta->CommittedConfig();
+    RaftConfigPB new_config = current_config;
+    new_config.clear_peers();
+    for (const auto& p : peers) {
+      RaftPeerPB new_peer;
+      new_peer.set_member_type(RaftPeerPB::VOTER);
+      new_peer.set_permanent_uuid(p.first);
+      HostPortPB new_peer_host_port_pb = HostPortToPB(p.second);
+      new_peer.mutable_last_known_addr()->CopyFrom(new_peer_host_port_pb);
+      new_config.add_peers()->CopyFrom(new_peer);
+    }
+    cmeta->set_committed_config(new_config);
+    RETURN_NOT_OK(cmeta->Flush());
+  }
+  return Status::OK();
 }
 
 Status SetRaftTerm(const RunnerContext& context) {
@@ -884,7 +904,7 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
       ActionBuilder("print_replica_uuids", &PrintReplicaUuids)
       .Description("Print all tablet replica peer UUIDs found in a "
                    "tablet's Raft configuration")
-      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .AddRequiredParameter({ kTabletIdsCsvArg, kTabletIdsCsvArgDesc })
       .AddOptionalParameter("fs_data_dirs")
       .AddOptionalParameter("fs_metadata_dir")
       .AddOptionalParameter("fs_wal_dir")
@@ -893,7 +913,7 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
   unique_ptr<Action> rewrite_raft_config =
       ActionBuilder("rewrite_raft_config", &RewriteRaftConfig)
       .Description("Rewrite a tablet replica's Raft configuration")
-      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .AddRequiredParameter({ kTabletIdsCsvArg, kTabletIdsCsvArgDesc })
       .AddRequiredVariadicParameter({ kRaftPeersArg, kRaftPeersArgDesc })
       .AddOptionalParameter("fs_data_dirs")
       .AddOptionalParameter("fs_metadata_dir")