You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by zh...@apache.org on 2022/05/19 09:43:20 UTC

[kudu] branch master updated: [Tools] Set all masters/tservers flags in a cluster in batch mode

This is an automated email from the ASF dual-hosted git repository.

zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 54855326b [Tools] Set all masters/tservers flags in a cluster in batch mode
54855326b is described below

commit 54855326b66dea649215702dc936988638205345
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Mon May 9 19:48:42 2022 +0800

    [Tools] Set all masters/tservers flags in a cluster in batch mode
    
    Currently Kudu can only set one master/tserver's flag once at a time.
    This patch set all masters/tservers flags in a cluster in batch mode.
    Usage:
    kudu master set_flag_for_all <master_addresses> <flag> <value>
    kudu tserver set_flag_for_all <master_addresses> <flag> <value>
    
    Change-Id: I33ce35389590c362d6167c677a7a3a28e08c040f
    Reviewed-on: http://gerrit.cloudera.org:8080/18506
    Tested-by: Kudu Jenkins
    Reviewed-by: Yifan Zhang <ch...@163.com>
    Reviewed-by: Yingchun Lai <ac...@gmail.com>
---
 src/kudu/tools/kudu-tool-test.cc      | 76 ++++++++++++++++++++++++++++++++++-
 src/kudu/tools/tool_action_master.cc  | 60 +++++++++++++++++++++++++++
 src/kudu/tools/tool_action_tserver.cc | 44 ++++++++++++++++++++
 3 files changed, 178 insertions(+), 2 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 0d1de64e6..9fabb315b 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstring>
 #include <sys/stat.h>
 
 #include <algorithm>
 #include <cstdint>
 #include <cstdio>
 #include <cstdlib>
+#include <cstring>
 #include <fstream>
 #include <functional>
 #include <initializer_list>
@@ -42,6 +42,7 @@
 #include <glog/stl_logging.h>
 #include <gmock/gmock-matchers.h>
 #include <gtest/gtest.h>
+#include <rapidjson/document.h>
 
 #include "kudu/cfile/cfile-test-base.h"
 #include "kudu/cfile/cfile_util.h"
@@ -1187,17 +1188,19 @@ TEST_F(ToolTest, TestModeHelp) {
         "get_flags.*Get the gflags",
         "run.*Run a Kudu Master",
         "set_flag.*Change a gflag value",
+        "set_flag_for_all.*Change a gflag value",
         "status.*Get the status",
         "timestamp.*Get the current timestamp",
         "list.*List masters in a Kudu cluster",
         "add.*Add a master to the Kudu cluster",
-        "remove.*Remove a master from the Kudu cluster"
+        "remove.*Remove a master from the Kudu cluster",
     };
     NO_FATALS(RunTestHelp(kCmd, kMasterModeRegexes));
     NO_FATALS(RunTestHelpRpcFlags(kCmd,
         { "dump_memtrackers",
           "get_flags",
           "set_flag",
+          "set_flag_for_all",
           "status",
           "timestamp",
           "list",
@@ -1361,6 +1364,7 @@ TEST_F(ToolTest, TestModeHelp) {
         "dump_memtrackers.*Dump the memtrackers",
         "get_flags.*Get the gflags",
         "set_flag.*Change a gflag value",
+        "set_flag_for_all.*Change a gflag value",
         "run.*Run a Kudu Tablet Server",
         "state.*Operate on the state",
         "status.*Get the status",
@@ -1373,6 +1377,7 @@ TEST_F(ToolTest, TestModeHelp) {
         { "dump_memtrackers",
           "get_flags",
           "set_flag",
+          "set_flag_for_all",
           "status",
           "timestamp",
           "list",
@@ -8095,5 +8100,72 @@ TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
   });
 }
 
+class SetFlagForAllTest :
+    public ToolTest,
+    public ::testing::WithParamInterface<bool> {
+};
+
+INSTANTIATE_TEST_SUITE_P(IsMaster, SetFlagForAllTest, ::testing::Bool());
+TEST_P(SetFlagForAllTest, TestSetFlagForAll) {
+  const auto is_master = GetParam();
+
+  ExternalMiniClusterOptions opts;
+  string role = "master";
+  if (is_master) {
+    opts.num_masters = 3;
+    role = "master";
+  } else {
+    opts.num_tablet_servers = 3;
+    role = "tserver";
+  }
+
+  NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+  vector<string> master_addresses;
+  for (int i = 0; i < opts.num_masters; i++) {
+    master_addresses.emplace_back(cluster_->master(i)->bound_rpc_addr().ToString());
+  }
+  string str_master_addresses = JoinMapped(master_addresses, [](string addr){return addr;}, ",");
+  const string flag_key = "max_log_size";
+  const string flag_value = "10";
+  NO_FATALS(RunActionStdoutNone(
+      Substitute("$0 set_flag_for_all $1 $2 $3",
+          role, str_master_addresses, flag_key, flag_value)));
+
+  int hosts_num = is_master? opts.num_masters : opts.num_tablet_servers;
+  string out;
+  for (int i = 0; i < hosts_num; i++) {
+    if (is_master) {
+      NO_FATALS(RunActionStdoutString(
+          Substitute("$0 get_flags $1 --format=json", role,
+              cluster_->master(i)->bound_rpc_addr().ToString()),
+              &out));
+    } else {
+      NO_FATALS(RunActionStdoutString(
+          Substitute("$0 get_flags $1 --format=json", role,
+          cluster_->tablet_server(i)->bound_rpc_addr().ToString()),
+          &out));
+    }
+    rapidjson::Document doc;
+    doc.Parse<0>(out.c_str());
+    for (int i = 0; i < doc.Size(); i++) {
+      const rapidjson::Value& item = doc[i];
+      ASSERT_TRUE(item["flag"].IsString());
+      if (item["flag"].GetString() == flag_key) {
+        ASSERT_TRUE(item["value"].IsString());
+        ASSERT_TRUE(item["value"].GetString() == flag_value);
+        return;
+      }
+    }
+  }
+
+  // A test for setting a non-existing flag.
+  Status s = RunTool(
+      Substitute("$0 set_flag_for_all $2 test_flag test_value",
+          role, str_master_addresses),
+      nullptr, nullptr, nullptr, nullptr);
+  ASSERT_TRUE(s.IsRuntimeError());
+  ASSERT_STR_CONTAINS(s.ToString(), "set flag failed");
+}
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc
index 12a672b65..5739281d5 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -143,6 +143,56 @@ Status MasterSetFlag(const RunnerContext& context) {
   return SetServerFlag(address, Master::kDefaultPort, flag, value);
 }
 
+Status MasterSetAllMasterFlag(const RunnerContext& context) {
+  LeaderMasterProxy proxy;
+  RETURN_NOT_OK(proxy.Init(context));
+
+  ListMastersRequestPB req;
+  ListMastersResponsePB resp;
+
+  RETURN_NOT_OK((proxy.SyncRpc<ListMastersRequestPB, ListMastersResponsePB>(
+      req, &resp, "ListMasters", &MasterServiceProxy::ListMastersAsync)));
+
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+
+  const auto hostport_to_string = [] (const HostPortPB& hostport) {
+    return Substitute("$0:$1", hostport.host(), hostport.port());
+  };
+
+  vector<ServerEntryPB> masters;
+  std::copy_if(resp.masters().begin(), resp.masters().end(), std::back_inserter(masters),
+               [](const ServerEntryPB& master) {
+                 if (master.has_error()) {
+                   LOG(WARNING) << "Failed to retrieve info for master: "
+                                << StatusFromPB(master.error()).ToString();
+                   return false;
+                 }
+                 return true;
+               });
+  vector<string> master_addresses;
+  for (const auto& master : masters) {
+    master_addresses.push_back(JoinMapped(master.registration().rpc_addresses(),
+                     hostport_to_string, ","));
+  }
+  const string& flag = FindOrDie(context.required_args, kFlagArg);
+  const string& value = FindOrDie(context.required_args, kValueArg);
+  bool set_failed_flag = false;
+  for (const auto& addr : master_addresses) {
+      Status s = SetServerFlag(addr, Master::kDefaultPort, flag, value);
+      if (!s.ok()) {
+        set_failed_flag = true;
+        LOG(WARNING) << Substitute("Set config {$0:$1} for $2 failed, error message: $3",
+                                   flag, value, addr, s.ToString());
+      }
+  }
+  if (set_failed_flag) {
+    return Status::RuntimeError("Some Masters set flag failed!");
+  }
+  return Status::OK();
+}
+
 Status MasterStatus(const RunnerContext& context) {
   const string& address = FindOrDie(context.required_args, kMasterAddressArg);
   return PrintServerStatus(address, Master::kDefaultPort);
@@ -769,6 +819,16 @@ unique_ptr<Mode> BuildMasterMode() {
         .Build();
     builder.AddAction(std::move(set_flag));
   }
+  {
+    unique_ptr<Action> set_flag_for_all =
+        ClusterActionBuilder("set_flag_for_all", &MasterSetAllMasterFlag)
+        .Description("Change a gflag value for all Kudu Masters in the cluster")
+        .AddRequiredParameter({ kFlagArg, "Name of the gflag" })
+        .AddRequiredParameter({ kValueArg, "New value for the gflag" })
+        .AddOptionalParameter("force")
+        .Build();
+    builder.AddAction(std::move(set_flag_for_all));
+  }
   {
     unique_ptr<Action> status =
         MasterActionBuilder("status", &MasterStatus)
diff --git a/src/kudu/tools/tool_action_tserver.cc b/src/kudu/tools/tool_action_tserver.cc
index 7947395d6..4134c9d8f 100644
--- a/src/kudu/tools/tool_action_tserver.cc
+++ b/src/kudu/tools/tool_action_tserver.cc
@@ -123,6 +123,42 @@ Status TServerSetFlag(const RunnerContext& context) {
                        flag, value);
 }
 
+Status TServerSetAllTServersFlag(const RunnerContext& context) {
+  const string& flag = FindOrDie(context.required_args, kFlagArg);
+  const string& value = FindOrDie(context.required_args, kValueArg);
+
+  LeaderMasterProxy proxy;
+  RETURN_NOT_OK(proxy.Init(context));
+
+  ListTabletServersRequestPB req;
+  ListTabletServersResponsePB resp;
+
+  RETURN_NOT_OK((proxy.SyncRpc<ListTabletServersRequestPB, ListTabletServersResponsePB>(
+      req, &resp, "ListTabletServers", &MasterServiceProxy::ListTabletServersAsync)));
+
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  const auto hostport_to_string = [](const HostPortPB& hostport) {
+    return Substitute("$0:$1", hostport.host(), hostport.port());
+  };
+  const auto& servers = resp.servers();
+  bool set_failed_flag = false;
+  for (const auto& server : servers) {
+    const string& addr = JoinMapped(server.registration().rpc_addresses(), hostport_to_string, ",");
+    Status s = SetServerFlag(addr, tserver::TabletServer::kDefaultPort, flag, value);
+    if (!s.ok()) {
+      set_failed_flag = true;
+      LOG(WARNING) << Substitute("Set config {$0:$1} for $2 failed, error message: $3",
+                                 flag, value, addr, s.ToString());
+    }
+  }
+  if (set_failed_flag) {
+    return Status::RuntimeError("Some Tablet Servers set flag failed!");
+  }
+  return Status::OK();
+}
+
 Status TServerStatus(const RunnerContext& context) {
   const string& address = FindOrDie(context.required_args, kTServerAddressArg);
   return PrintServerStatus(address, tserver::TabletServer::kDefaultPort);
@@ -389,6 +425,13 @@ unique_ptr<Mode> BuildTServerMode() {
       .AddOptionalParameter("force")
       .Build();
 
+  unique_ptr<Action> set_flag_for_all =
+      ClusterActionBuilder("set_flag_for_all", &TServerSetAllTServersFlag)
+      .Description("Change a gflag value for all Kudu Tablet Servers in the cluster")
+      .AddRequiredParameter({ kFlagArg, "Name of the gflag" })
+      .AddRequiredParameter({ kValueArg, "New value for the gflag" })
+      .Build();
+
   unique_ptr<Action> status =
       TServerActionBuilder("status", &TServerStatus)
       .Description("Get the status of a Kudu Tablet Server")
@@ -471,6 +514,7 @@ unique_ptr<Mode> BuildTServerMode() {
       .AddAction(std::move(get_flags))
       .AddAction(std::move(run))
       .AddAction(std::move(set_flag))
+      .AddAction(std::move(set_flag_for_all))
       .AddAction(std::move(status))
       .AddAction(std::move(timestamp))
       .AddAction(std::move(list_tservers))