You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2020/03/04 09:35:10 UTC

[kudu] branch master updated (c3122b6 -> bc2efa1)

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from c3122b6  [ksck] field renamings: flags_ --> unusual_flags_
     new f4176b1  subprocess: plumb Java metrics into C++
     new bc2efa1  ksck: display quiecing-related info

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tablet_server_quiescing-itest.cc               |  57 +++++--
 src/kudu/rebalance/cluster_status.h                |   8 +
 src/kudu/subprocess/CMakeLists.txt                 |   1 +
 src/kudu/subprocess/subprocess_proxy-test.cc       | 167 +++++++++++++++++++++
 src/kudu/subprocess/subprocess_proxy.h             | 102 +++++++++++++
 src/kudu/tools/ksck-test.cc                        |   2 +
 src/kudu/tools/ksck.cc                             |   1 +
 src/kudu/tools/ksck.h                              |  14 ++
 src/kudu/tools/ksck_remote.cc                      |  32 +++-
 src/kudu/tools/ksck_remote.h                       |   4 +
 src/kudu/tools/ksck_results.cc                     |  35 +++++
 src/kudu/tools/tool_action_cluster.cc              |   4 +-
 src/kudu/tools/tool_action_tserver.cc              |   5 +-
 src/kudu/tserver/tablet_service.cc                 |  12 ++
 src/kudu/tserver/tablet_service.h                  |   6 +-
 src/kudu/tserver/tserver.proto                     |   1 +
 16 files changed, 434 insertions(+), 17 deletions(-)
 create mode 100644 src/kudu/subprocess/subprocess_proxy-test.cc
 create mode 100644 src/kudu/subprocess/subprocess_proxy.h


[kudu] 02/02: ksck: display quiecing-related info

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit bc2efa1ae97fcc4e592c93d38592206d48d6f8f5
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Feb 28 00:15:33 2020 -0800

    ksck: display quiecing-related info
    
    This patch adds quiescing-related info to ksck's "Tablet Server Summary"
    section. Specifically, it displays the quiescing state, the number of
    tablet leaders, and the number of active scanners[1].
    
    If none of the tablet servers are quiescing, the quiescing state column
    is omitted. If none of the tablet servers support the quiescing RPC, all
    related columns are omitted.
    
    I manually tested against a cluster that fully didn't support quiescing,
    as well as one that partially supports quiescing[2].
    
    The info is displayed by default with ksck, since the information may be
    invaluable in debugging performance or workload skew. The info can be
    ommitted by setting `--quiescing_info` to false.
    
    [1] Sample output:
    Tablet Server Summary
                   UUID               |            Address             | Status  | Location | Quiescing | Tablet Leaders | Active Scanners
    ----------------------------------+--------------------------------+---------+----------+-----------+----------------+-----------------
     1e8c8c55d0e24110b29caaecdae491ca | ve1318.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       2        |       0
     36e8894c4e6d48c690f64ade8b5fe52d | ve1320.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       7        |       0
     629bbaecfead49f18247d7963cfa98af | ve1319.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       0        |       0
     9dfdd5aac2814353bd50cefca2d77403 | ve1321.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       8        |       0
     9fe2954950ea4f4eaecc4ef97c6eb44a | ve1317.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       6        |       0
     a5dd443f61464c34aca585a905e87926 | ve1322.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       7        |       0
     dffda2ef2d33481993d29009f3f87420 | ve1323.halxg.cloudera.com:7050 | HEALTHY | /default | true      |       6        |       0
     e6c9b1df642a4cf69c47f36480dd4723 | ve1316.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       6        |       0
     efc1275241604b0aa886494f8da9e00b | ve1324.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       7        |       0
    
    [2] Output of partial support for quiescing across the cluster yields "partial"
        results; not the prettiest, but it's also not a scenario we expect often:
    W0228 18:36:40.200479 383527 ksck_remote.cc:290] Couldn't fetch quiescing info from tablet server 629bbaecfead49f18247d7963cfa98af (ve1319.halxg.cloudera.com:7050): Remote error: unsupported feature flags
    W0228 18:36:40.200585 383525 ksck_remote.cc:290] Couldn't fetch quiescing info from tablet server 1e8c8c55d0e24110b29caaecdae491ca (ve1318.halxg.cloudera.com:7050): Remote error: unsupported feature flags
    W0228 18:36:40.201057 383526 ksck_remote.cc:290] Couldn't fetch quiescing info from tablet server 36e8894c4e6d48c690f64ade8b5fe52d (ve1320.halxg.cloudera.com:7050): Remote error: unsupported feature flags
    W0228 18:36:40.202527 383528 ksck_remote.cc:290] Couldn't fetch quiescing info from tablet server 9dfdd5aac2814353bd50cefca2d77403 (ve1321.halxg.cloudera.com:7050): Remote error: unsupported feature flags
    W0228 18:36:40.202736 383530 ksck_remote.cc:290] Couldn't fetch quiescing info from tablet server a5dd443f61464c34aca585a905e87926 (ve1322.halxg.cloudera.com:7050): Remote error: unsupported feature flags
    W0228 18:36:40.202940 383532 ksck_remote.cc:290] Couldn't fetch quiescing info from tablet server dffda2ef2d33481993d29009f3f87420 (ve1323.halxg.cloudera.com:7050): Remote error: unsupported feature flags
    W0228 18:36:40.203280 383536 ksck_remote.cc:290] Couldn't fetch quiescing info from tablet server efc1275241604b0aa886494f8da9e00b (ve1324.halxg.cloudera.com:7050): Remote error: unsupported feature flags
    ...
    Tablet Server Summary
                   UUID               |            Address             | Status  | Location | Quiescing | Tablet Leaders | Active Scanners
    ----------------------------------+--------------------------------+---------+----------+-----------+----------------+-----------------
     1e8c8c55d0e24110b29caaecdae491ca | ve1318.halxg.cloudera.com:7050 | HEALTHY | /default | n/a       | n/a            | n/a
     36e8894c4e6d48c690f64ade8b5fe52d | ve1320.halxg.cloudera.com:7050 | HEALTHY | /default | n/a       | n/a            | n/a
     629bbaecfead49f18247d7963cfa98af | ve1319.halxg.cloudera.com:7050 | HEALTHY | /default | n/a       | n/a            | n/a
     9dfdd5aac2814353bd50cefca2d77403 | ve1321.halxg.cloudera.com:7050 | HEALTHY | /default | n/a       | n/a            | n/a
     9fe2954950ea4f4eaecc4ef97c6eb44a | ve1317.halxg.cloudera.com:7050 | HEALTHY | /default | true      |       5        |       0
     a5dd443f61464c34aca585a905e87926 | ve1322.halxg.cloudera.com:7050 | HEALTHY | /default | n/a       | n/a            | n/a
     dffda2ef2d33481993d29009f3f87420 | ve1323.halxg.cloudera.com:7050 | HEALTHY | /default | n/a       | n/a            | n/a
     e6c9b1df642a4cf69c47f36480dd4723 | ve1316.halxg.cloudera.com:7050 | HEALTHY | /default | false     |       6        |       0
     efc1275241604b0aa886494f8da9e00b | ve1324.halxg.cloudera.com:7050 | HEALTHY | /default | n/a       | n/a            | n/a
    
    Change-Id: Ibdc650eb3ee30e8993330f2cbd389076ea2bad49
    Reviewed-on: http://gerrit.cloudera.org:8080/15323
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../tablet_server_quiescing-itest.cc               | 57 ++++++++++++++++++----
 src/kudu/rebalance/cluster_status.h                |  8 +++
 src/kudu/tools/ksck-test.cc                        |  2 +
 src/kudu/tools/ksck.cc                             |  1 +
 src/kudu/tools/ksck.h                              | 14 ++++++
 src/kudu/tools/ksck_remote.cc                      | 32 +++++++++++-
 src/kudu/tools/ksck_remote.h                       |  4 ++
 src/kudu/tools/ksck_results.cc                     | 35 +++++++++++++
 src/kudu/tools/tool_action_cluster.cc              |  4 +-
 src/kudu/tools/tool_action_tserver.cc              |  5 +-
 src/kudu/tserver/tablet_service.cc                 | 12 +++++
 src/kudu/tserver/tablet_service.h                  |  6 +--
 src/kudu/tserver/tserver.proto                     |  1 +
 13 files changed, 164 insertions(+), 17 deletions(-)

diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
index 75352d4..921915c 100644
--- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -36,6 +36,7 @@
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tools/tool_test_util.h"
@@ -377,8 +378,10 @@ TEST_F(TServerQuiescingITest, TestQuiescingToolBasics) {
   auto* ts = cluster_->mini_tablet_server(0);
   auto rw_workload = CreateFaultIntolerantRWWorkload();
   rw_workload->Setup();
-  rw_workload->set_num_read_threads(1);
+  // Spawn a bunch of read threads so we'll be more likely to see scanners.
+  rw_workload->set_num_read_threads(10);
   ASSERT_FALSE(ts->server()->quiescing());
+  const auto& master_addr = cluster_->mini_master()->bound_rpc_addr().ToString();
   // First, call the start tool a couple of times.
   for (int i = 0; i < 2; i++) {
     ASSERT_OK(RunActionPrependStdoutStderr(
@@ -391,9 +394,17 @@ TEST_F(TServerQuiescingITest, TestQuiescingToolBasics) {
     ASSERT_OK(RunKuduTool({ "tserver", "quiesce", "status", ts->bound_rpc_addr().ToString() },
                           &stdout));
     ASSERT_STR_CONTAINS(stdout,
-        " Quiescing | Tablet leader count | Active scanner count\n"
-        "-----------+---------------------+----------------------\n"
-        " true      |       1             |       0");
+        " Quiescing | Tablet Leaders | Active Scanners\n"
+        "-----------+----------------+-----------------\n"
+        " true      |       1        |       0");
+    ASSERT_TRUE(ts->server()->quiescing());
+
+    // Same with ksck.
+    ASSERT_OK(RunKuduTool({ "cluster", "ksck", master_addr }, &stdout));
+    ASSERT_STR_MATCHES(stdout,
+        ".* Quiescing | Tablet Leaders | Active Scanners\n"
+        ".*-----------+----------------+-----------------\n"
+        ".* true      |       1        |      0");
     ASSERT_TRUE(ts->server()->quiescing());
   }
   ASSERT_OK(RunActionPrependStdoutStderr(
@@ -402,20 +413,46 @@ TEST_F(TServerQuiescingITest, TestQuiescingToolBasics) {
   ASSERT_OK(RunKuduTool({ "tserver", "quiesce", "status", ts->bound_rpc_addr().ToString() },
                         &stdout));
   ASSERT_STR_CONTAINS(stdout,
-      " Quiescing | Tablet leader count | Active scanner count\n"
-      "-----------+---------------------+----------------------\n"
-      " false     |       1             |       0");
+      " Quiescing | Tablet Leaders | Active Scanners\n"
+      "-----------+----------------+-----------------\n"
+      " false     |       1        |       0");
   ASSERT_FALSE(ts->server()->quiescing());
 
+  // When there aren't quiescing tservers, ksck won't report the quiescing
+  // status, but it will still report related info...
+  ASSERT_OK(RunKuduTool({ "cluster", "ksck", master_addr }, &stdout));
+  ASSERT_STR_MATCHES(stdout,
+      ".* Tablet Leaders | Active Scanners\n"
+      ".*----------------+-----------------\n"
+      ".*       1        |      0");
+  ASSERT_STR_NOT_CONTAINS(stdout, "Quiescing");
+  ASSERT_FALSE(ts->server()->quiescing());
+
+  // ... until the user doesn't want to see that.
+  ASSERT_OK(RunKuduTool({ "cluster", "ksck", "--noquiescing_info", master_addr }, &stdout));
+  ASSERT_STR_NOT_CONTAINS(stdout, "Quiescing");
+  ASSERT_STR_NOT_CONTAINS(stdout, "Tablet Leaders");
+  ASSERT_STR_NOT_CONTAINS(stdout, "Active Scanners");
+  ASSERT_FALSE(ts->server()->quiescing());
+
+
   // Now try getting the status with some scanners.
+  // Set a low batch size so we'll be more likely to catch scanners in the act.
+  FLAGS_scanner_default_batch_size_bytes = 1;
   rw_workload->Start();
   ASSERT_EVENTUALLY([&] {
     ASSERT_OK(RunKuduTool({ "tserver", "quiesce", "status", ts->bound_rpc_addr().ToString() },
                           &stdout));
     ASSERT_STR_CONTAINS(stdout, Substitute(
-        " Quiescing | Tablet leader count | Active scanner count\n"
-        "-----------+---------------------+----------------------\n"
-        " false     |       1             |       $0",
+        " Quiescing | Tablet Leaders | Active Scanners\n"
+        "-----------+----------------+-----------------\n"
+        " false     |       1        |       $0",
+        ts->server()->scanner_manager()->CountActiveScanners()));
+    ASSERT_OK(RunKuduTool({ "cluster", "ksck", master_addr }, &stdout));
+    ASSERT_STR_MATCHES(stdout, Substitute(
+        ".* Tablet Leaders | Active Scanners\n"
+        ".*----------------+-----------------\n"
+        ".*       1        |      $0",
         ts->server()->scanner_manager()->CountActiveScanners()));
   });
   ASSERT_FALSE(ts->server()->quiescing());
diff --git a/src/kudu/rebalance/cluster_status.h b/src/kudu/rebalance/cluster_status.h
index 6ca403d..8d6ea8c 100644
--- a/src/kudu/rebalance/cluster_status.h
+++ b/src/kudu/rebalance/cluster_status.h
@@ -129,12 +129,20 @@ enum class ServerHealth {
 // Return a string representation of 'sh'.
 const char* const ServerHealthToString(ServerHealth sh);
 
+// Quiescing-related info.
+struct QuiescingInfo {
+  bool is_quiescing;
+  int num_leaders;
+  int num_active_scanners;
+};
+
 // A summary of a server health check.
 struct ServerHealthSummary {
   std::string uuid;
   std::string address;
   std::string ts_location;
   boost::optional<std::string> version;
+  boost::optional<QuiescingInfo> quiescing_info;
   ServerHealth health = ServerHealth::HEALTHY;
   Status status = Status::OK();
 };
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index c21dd4e..13b669d 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -178,6 +178,8 @@ class MockKsckTabletServer : public KsckTabletServer {
     return Status::OK();
   }
 
+  void FetchQuiescingInfo() override {}
+
   void RunTabletChecksumScanAsync(
       const std::string& tablet_id,
       const Schema& /*schema*/,
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 60e29da..0582de9 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -355,6 +355,7 @@ Status Ksck::FetchInfoFromTabletServers() {
       summary.address = ts->address();
       summary.ts_location = ts->location();
       summary.version = ts->version();
+      summary.quiescing_info = ts->quiescing_info();
       summary.status = s;
       if (!s.ok()) {
         if (IsNotAuthorizedMethodAccess(s)) {
diff --git a/src/kudu/tools/ksck.h b/src/kudu/tools/ksck.h
index d90baf7..2c3b63e 100644
--- a/src/kudu/tools/ksck.h
+++ b/src/kudu/tools/ksck.h
@@ -304,6 +304,11 @@ class KsckTabletServer {
   virtual void FetchCurrentTimestampAsync() = 0;
   virtual Status FetchCurrentTimestamp() = 0;
 
+  // Fetches the quiescing information for the tablet server. This is best
+  // effort; a message is logged if it is unsuccessful (e.g. if the server
+  // doesn't support the quiescing RPC).
+  virtual void FetchQuiescingInfo() = 0;
+
   // Executes a checksum scan on a tablet and reports the result to 'manager'.
   virtual void RunTabletChecksumScanAsync(
                   const std::string& tablet_id,
@@ -354,6 +359,11 @@ class KsckTabletServer {
     return unusual_flags_;
   }
 
+  virtual const boost::optional<cluster_summary::QuiescingInfo>& quiescing_info() const {
+    CHECK_NE(KsckFetchState::UNINITIALIZED, state_);
+    return quiescing_info_;
+  }
+
   uint64_t current_timestamp() const {
     CHECK_EQ(KsckFetchState::FETCHED, state_);
     return timestamp_;
@@ -381,9 +391,13 @@ class KsckTabletServer {
   // unusual_flags_state_ reflects whether the fetch of the non-critical flags
   // info has been done, and if it succeeded or failed.
   KsckFetchState unusual_flags_state_ = KsckFetchState::UNINITIALIZED;
+
   // May be none if flag fetch fails.
   boost::optional<server::GetFlagsResponsePB> unusual_flags_;
 
+  // May be none if the quiescing request fails.
+  boost::optional<cluster_summary::QuiescingInfo> quiescing_info_;
+
   std::atomic<uint64_t> timestamp_;
   const std::string uuid_;
   std::string location_;
diff --git a/src/kudu/tools/ksck_remote.cc b/src/kudu/tools/ksck_remote.cc
index 0571464..0f3a1da 100644
--- a/src/kudu/tools/ksck_remote.cc
+++ b/src/kudu/tools/ksck_remote.cc
@@ -63,6 +63,8 @@
 #include "kudu/tools/tool_action_common.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/locks.h"
@@ -75,7 +77,11 @@
 DECLARE_int64(timeout_ms); // defined in tool_action_common
 DECLARE_int32(fetch_info_concurrency);
 
-DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache the read blocks");
+DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache the read blocks.");
+DEFINE_bool(quiescing_info, true,
+            "Whether to display the quiescing-related information of each tablet server, "
+            "e.g. number of tablet leaders per server, the number of active scanners "
+            "per server.");
 
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduScanToken;
@@ -189,6 +195,7 @@ Status RemoteKsckTabletServer::Init() {
   const auto& host = host_port_.host();
   generic_proxy_.reset(new server::GenericServiceProxy(messenger_, addr, host));
   ts_proxy_.reset(new tserver::TabletServerServiceProxy(messenger_, addr, host));
+  ts_admin_proxy_.reset(new tserver::TabletServerAdminServiceProxy(messenger_, addr, host));
   consensus_proxy_.reset(new consensus::ConsensusServiceProxy(messenger_, addr, host));
   return Status::OK();
 }
@@ -232,6 +239,9 @@ Status RemoteKsckTabletServer::FetchInfo(ServerHealth* health) {
   }
 
   RETURN_NOT_OK(FetchCurrentTimestamp());
+  if (FLAGS_quiescing_info) {
+    FetchQuiescingInfo();
+  }
 
   state_ = KsckFetchState::FETCHED;
   *health = ServerHealth::HEALTHY;
@@ -268,6 +278,26 @@ Status RemoteKsckTabletServer::FetchCurrentTimestamp() {
   return Status::OK();
 }
 
+void RemoteKsckTabletServer::FetchQuiescingInfo() {
+  tserver::QuiesceTabletServerRequestPB req;
+  tserver::QuiesceTabletServerResponsePB resp;
+  req.set_return_stats(true);
+  RpcController rpc;
+  rpc.set_timeout(GetDefaultTimeout());
+  rpc.RequireServerFeature(tserver::TabletServerFeatures::QUIESCING);
+  Status s = ts_admin_proxy_->Quiesce(req, &resp, &rpc);
+  if (!s.ok()) {
+    LOG(WARNING) << Substitute("Couldn't fetch quiescing info from tablet server $0 ($1): $2",
+                               uuid_, address(), s.ToString());
+    return;
+  }
+  cluster_summary::QuiescingInfo qinfo;
+  qinfo.is_quiescing = resp.is_quiescing();
+  qinfo.num_leaders = resp.num_leaders();
+  qinfo.num_active_scanners = resp.num_active_scanners();
+  quiescing_info_ = qinfo;
+}
+
 Status RemoteKsckTabletServer::FetchConsensusState(ServerHealth* health) {
   DCHECK(health);
   *health = ServerHealth::UNAVAILABLE;
diff --git a/src/kudu/tools/ksck_remote.h b/src/kudu/tools/ksck_remote.h
index 9980871..c1dc4ab 100644
--- a/src/kudu/tools/ksck_remote.h
+++ b/src/kudu/tools/ksck_remote.h
@@ -56,6 +56,7 @@ class GenericServiceProxy;
 
 namespace tserver {
 class TabletServerServiceProxy;
+class TabletServerAdminServiceProxy;
 }
 
 namespace tools {
@@ -115,6 +116,8 @@ class RemoteKsckTabletServer : public KsckTabletServer,
   void FetchCurrentTimestampAsync() override;
   Status FetchCurrentTimestamp() override;
 
+  void FetchQuiescingInfo() override;
+
   void RunTabletChecksumScanAsync(
       const std::string& tablet_id,
       const Schema& schema,
@@ -148,6 +151,7 @@ class RemoteKsckTabletServer : public KsckTabletServer,
   const std::shared_ptr<rpc::Messenger> messenger_;
   std::shared_ptr<server::GenericServiceProxy> generic_proxy_;
   std::shared_ptr<tserver::TabletServerServiceProxy> ts_proxy_;
+  std::shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy_;
   std::shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;
 };
 
diff --git a/src/kudu/tools/ksck_results.cc b/src/kudu/tools/ksck_results.cc
index 029b5f6..16a9506 100644
--- a/src/kudu/tools/ksck_results.cc
+++ b/src/kudu/tools/ksck_results.cc
@@ -360,11 +360,46 @@ Status PrintServerHealthSummaries(ServerType type,
   if (type == ServerType::TABLET_SERVER) {
     DataTable table({ "UUID", "Address", "Status", "Location" });
     unordered_map<string, int> location_counts;
+    bool has_quiescing_info = false;
     for (const auto& s : summaries) {
+      if (s.quiescing_info) {
+        has_quiescing_info = true;
+      }
       string location = s.ts_location.empty() ? "<none>" : s.ts_location;
       location_counts[location]++;
       table.AddRow({ s.uuid, s.address, ServerHealthToString(s.health), std::move(location) });
     }
+
+    // If any quiescing info was collected, add it too.
+    if (has_quiescing_info) {
+      vector<string> quiescing_col;
+      vector<string> num_leaders_col;
+      vector<string> num_scanners_col;
+      bool has_quiescing_server = false;
+      for (const auto& s : summaries) {
+        if (s.quiescing_info) {
+          const auto& qinfo = *s.quiescing_info;
+          if (qinfo.is_quiescing) {
+            has_quiescing_server = true;
+            quiescing_col.emplace_back("true");
+          } else {
+            quiescing_col.emplace_back("false");
+          }
+          num_leaders_col.emplace_back(IntToString(qinfo.num_leaders));
+          num_scanners_col.emplace_back(IntToString(qinfo.num_active_scanners));
+        } else {
+          quiescing_col.emplace_back("n/a");
+          num_leaders_col.emplace_back("n/a");
+          num_scanners_col.emplace_back("n/a");
+        }
+      }
+      // Only output the quiescing column if there are quiescing servers.
+      if (has_quiescing_server) {
+        table.AddColumn("Quiescing", std::move(quiescing_col));
+      }
+      table.AddColumn("Tablet Leaders", std::move(num_leaders_col));
+      table.AddColumn("Active Scanners", std::move(num_scanners_col));
+    }
     RETURN_NOT_OK(table.PrintTo(out));
 
     // Print location count table.
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 8bb3d5c..4f8f4b9 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -17,17 +17,18 @@
 
 #include <algorithm>
 #include <cstdlib>
+#include <initializer_list>
 #include <iostream>
 #include <iterator>
 #include <memory>
 #include <string>
 #include <tuple>
+#include <utility>
 #include <vector>
 
 #include <boost/algorithm/string/predicate.hpp>
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/basictypes.h"
@@ -386,6 +387,7 @@ unique_ptr<Mode> BuildClusterMode() {
         .AddOptionalParameter("consensus")
         .AddOptionalParameter("fetch_info_concurrency")
         .AddOptionalParameter("ksck_format")
+        .AddOptionalParameter("quiescing_info")
         .AddOptionalParameter("sections")
         .AddOptionalParameter("tables")
         .AddOptionalParameter("tablets")
diff --git a/src/kudu/tools/tool_action_tserver.cc b/src/kudu/tools/tool_action_tserver.cc
index 0540d86..84a82bc 100644
--- a/src/kudu/tools/tool_action_tserver.cc
+++ b/src/kudu/tools/tool_action_tserver.cc
@@ -286,14 +286,15 @@ Status QuiescingStatus(const RunnerContext& context) {
   req.set_return_stats(true);
   QuiesceTabletServerResponsePB resp;
   RpcController rpc;
+  rpc.RequireServerFeature(tserver::TabletServerFeatures::QUIESCING);
   RETURN_NOT_OK(proxy->Quiesce(req, &resp, &rpc));
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
   DataTable table({});
   table.AddColumn("Quiescing", { resp.is_quiescing() ? "true" : "false" });
-  table.AddColumn("Tablet leader count", { IntToString(resp.num_leaders()) });
-  table.AddColumn("Active scanner count", { IntToString(resp.num_active_scanners()) });
+  table.AddColumn("Tablet Leaders", { IntToString(resp.num_leaders()) });
+  table.AddColumn("Active Scanners", { IntToString(resp.num_active_scanners()) });
   return table.PrintTo(cout);
 }
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 02da66e..b10eeff 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1013,6 +1013,17 @@ void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req,
   }
 }
 
+bool TabletServiceAdminImpl::SupportsFeature(uint32_t feature) const {
+  switch (feature) {
+    case TabletServerFeatures::COLUMN_PREDICATES:
+    case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
+    case TabletServerFeatures::QUIESCING:
+      return true;
+    default:
+      return false;
+  }
+}
+
 void TabletServiceAdminImpl::Quiesce(const QuiesceTabletServerRequestPB* req,
                                      QuiesceTabletServerResponsePB* resp,
                                      rpc::RpcContext* context) {
@@ -2097,6 +2108,7 @@ bool TabletServiceImpl::SupportsFeature(uint32_t feature) const {
   switch (feature) {
     case TabletServerFeatures::COLUMN_PREDICATES:
     case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
+    case TabletServerFeatures::QUIESCING:
       return true;
     default:
       return false;
diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h
index a82e09a..1aa81c3 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TSERVER_TABLET_SERVICE_H
-#define KUDU_TSERVER_TABLET_SERVICE_H
+#pragma once
 
 #include <cstdint>
 #include <memory>
@@ -217,6 +216,8 @@ class TabletServiceAdminImpl : public TabletServerAdminServiceIf {
                QuiesceTabletServerResponsePB* resp,
                rpc::RpcContext* context) override;
 
+  bool SupportsFeature(uint32_t feature) const override;
+
  private:
   TabletServer* server_;
 };
@@ -284,4 +285,3 @@ class ConsensusServiceImpl : public consensus::ConsensusServiceIf {
 } // namespace tserver
 } // namespace kudu
 
-#endif
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index bbf7f43..3c5599a 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -470,4 +470,5 @@ enum TabletServerFeatures {
   COLUMN_PREDICATES = 1;
   // Whether the server supports padding UNIXTIME_MICROS slots to 16 bytes.
   PAD_UNIXTIME_MICROS_TO_16_BYTES = 2;
+  QUIESCING = 3;
 }


[kudu] 01/02: subprocess: plumb Java metrics into C++

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f4176b1ad00c3fa01b808bbdd58f08f0aa4ab471
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Mar 2 21:18:41 2020 -0800

    subprocess: plumb Java metrics into C++
    
    This plumbs metrics from the Java subprocess into C++ and encapsulates
    common code used to interact with a SubprocessServer bits (including the
    metrics parsing) into the new SubprocessProxy template class.
    
    This template is specialized for Echo{Request,Response}PB messages as
    the new test-only EchoSubprocess, and adds echo-specific histogram
    metrics based on those returned by the Echo Java subprocess.
    
    Change-Id: I7260ea13717dfd4af0138f77dfb6e5d239b3bee2
    Reviewed-on: http://gerrit.cloudera.org:8080/15344
    Tested-by: Kudu Jenkins
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/subprocess/CMakeLists.txt           |   1 +
 src/kudu/subprocess/subprocess_proxy-test.cc | 167 +++++++++++++++++++++++++++
 src/kudu/subprocess/subprocess_proxy.h       | 102 ++++++++++++++++
 3 files changed, 270 insertions(+)

diff --git a/src/kudu/subprocess/CMakeLists.txt b/src/kudu/subprocess/CMakeLists.txt
index 62f277c..8f284f5 100644
--- a/src/kudu/subprocess/CMakeLists.txt
+++ b/src/kudu/subprocess/CMakeLists.txt
@@ -72,4 +72,5 @@ if (NOT NO_TESTS)
   )
 endif()
 
+ADD_KUDU_TEST(subprocess_proxy-test)
 ADD_KUDU_TEST(subprocess_server-test)
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
new file mode 100644
index 0000000..6d26268
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/subprocess/subprocess_proxy.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::make_shared;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length,
+    "Echo subprocess inbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Echo subprocess' inbound request queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
+    "Echo subprocess outbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Echo subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms,
+    "Echo subprocess inbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo subprocess' inbound request queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms,
+    "Echo subprocess outbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
+    "Echo subprocess execution time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent executing the Echo subprocess request, excluding "
+    "time spent spent in the subprocess queues",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+
+
+namespace kudu {
+namespace subprocess {
+
+
+#define GINIT(member, x) member = METRIC_##x.Instantiate(entity, 0)
+#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
+struct EchoSubprocessMetrics : public SubprocessMetrics {
+  explicit EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
+    HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length);
+    HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length);
+    HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+    HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
+    HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms);
+  }
+};
+#undef HISTINIT
+#undef MINIT
+
+typedef SubprocessProxy<EchoRequestPB, EchoResponsePB, EchoSubprocessMetrics> EchoSubprocess;
+
+class EchoSubprocessTest : public KuduTest {
+ public:
+  EchoSubprocessTest()
+      : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+                                                        "subprocess_proxy-test")) {}
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(ResetEchoSubprocess());
+  }
+
+  Status ResetEchoSubprocess() {
+    string exe;
+    RETURN_NOT_OK(env_->GetExecutablePath(&exe));
+    const string bin_dir = DirName(exe);
+    string java_home;
+    RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home));
+    vector<string> argv = {
+      Substitute("$0/bin/java", java_home),
+      "-jar", Substitute("$0/kudu-subprocess-echo.jar", bin_dir)
+    };
+    echo_subprocess_ = make_shared<EchoSubprocess>(std::move(argv), metric_entity_);
+    return echo_subprocess_->Start();
+  }
+
+ protected:
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  shared_ptr<EchoSubprocess> echo_subprocess_;
+};
+
+TEST_F(EchoSubprocessTest, TestBasicMetrics) {
+  const string kMessage = "don't catch you slippin' now";
+  const int64_t kSleepMs = 1000;
+  EchoRequestPB req;
+  req.set_data(kMessage);
+  req.set_sleep_ms(kSleepMs);
+  EchoResponsePB resp;
+  ASSERT_OK(echo_subprocess_->Execute(req, &resp));
+  ASSERT_EQ(kMessage, resp.data());
+
+  // There shouldn't have anything in the subprocess queues.
+  Histogram* in_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_inbound_queue_length).get());
+  ASSERT_EQ(1, in_len_hist->TotalCount());
+  ASSERT_EQ(0, in_len_hist->MaxValueForTests());
+  Histogram* out_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_outbound_queue_length).get());
+  ASSERT_EQ(1, out_len_hist->TotalCount());
+  ASSERT_EQ(0, out_len_hist->MaxValueForTests());
+
+  // We should have some non-negative queue times.
+  Histogram* out_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_outbound_queue_time_ms).get());
+  ASSERT_EQ(1, out_hist->TotalCount());
+  ASSERT_LE(0, out_hist->MaxValueForTests());
+  Histogram* in_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_inbound_queue_time_ms).get());
+  ASSERT_EQ(1, in_hist->TotalCount());
+  ASSERT_LE(0, in_hist->MaxValueForTests());
+
+  // The execution should've taken at least our sleep time.
+  Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_execution_time_ms).get());
+  ASSERT_EQ(1, exec_hist->TotalCount());
+  ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
+}
+
+} // namespace subprocess
+} // namespace kudu
diff --git a/src/kudu/subprocess/subprocess_proxy.h b/src/kudu/subprocess/subprocess_proxy.h
new file mode 100644
index 0000000..3d60311
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_proxy.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/subprocess/server.h"
+#include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace subprocess {
+
+// TODO(awong): add server metrics.
+struct SubprocessMetrics {
+  scoped_refptr<Histogram> inbound_queue_length;
+  scoped_refptr<Histogram> outbound_queue_length;
+  scoped_refptr<Histogram> inbound_queue_time_ms;
+  scoped_refptr<Histogram> outbound_queue_time_ms;
+  scoped_refptr<Histogram> execution_time_ms;
+};
+
+// Template that wraps a SubprocessServer, exposing only the underlying ReqPB
+// and RespPB as an interface. The given MetricsPB will be initialized,
+// allowing for metrics specific to each specialized SubprocessServer.
+template<class ReqPB, class RespPB, class MetricsPB>
+class SubprocessProxy {
+ public:
+  SubprocessProxy(std::vector<std::string> argv, const scoped_refptr<MetricEntity>& entity)
+      : server_(std::move(argv)), metrics_(entity) {}
+
+  // Starts the underlying subprocess.
+  Status Start() {
+    return server_.Init();
+  }
+
+  // Executes the given request and populates the given response, returning a
+  // non-OK Status if there was an error sending the request (e.g. timed out)
+  // or if there was an error in the response.
+  Status Execute(const ReqPB& req, RespPB* resp) {
+    SubprocessRequestPB sreq;
+    sreq.mutable_request()->PackFrom(req);
+    SubprocessResponsePB sresp;
+    RETURN_NOT_OK(server_.Execute(&sreq, &sresp));
+    if (!sresp.response().UnpackTo(resp)) {
+      LOG(ERROR) << strings::Substitute("unable to unpack response: $0",
+                                        pb_util::SecureDebugString(sresp));
+      return Status::Corruption("unable to unpack response");
+    }
+    // The subprocess metrics should still be valid regardless of whether there
+    // was an error, so parse them first.
+    if (sresp.has_metrics()) {
+      ParseMetricsPB(sresp.metrics());
+    }
+    if (sresp.has_error()) {
+      return StatusFromPB(sresp.error());
+    }
+    return Status::OK();
+  }
+ private:
+  // Parses the given metrics protobuf and updates 'metrics_' based on its
+  // contents.
+  void ParseMetricsPB(const SubprocessMetricsPB& pb) {
+    DCHECK(pb.has_inbound_queue_length());
+    DCHECK(pb.has_outbound_queue_length());
+    DCHECK(pb.has_inbound_queue_time_ms());
+    DCHECK(pb.has_outbound_queue_time_ms());
+    DCHECK(pb.has_execution_time_ms());
+    metrics_.inbound_queue_length->Increment(pb.inbound_queue_length());
+    metrics_.outbound_queue_length->Increment(pb.outbound_queue_length());
+    metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
+    metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
+    metrics_.execution_time_ms->Increment(pb.execution_time_ms());
+  }
+
+  SubprocessServer server_;
+  MetricsPB metrics_;
+};
+
+} // namespace subprocess
+} // namespace kudu