You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2020/08/15 14:59:41 UTC

[kudu] 13/23: [collector] add service monitor

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to tag kudu-1.12.0-mdh1.0.0-4c2c075-centos-release
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0abd7ecf9ff6e46c86f42e7abed1b8a935cc55c2
Author: 张一帆 <zh...@xiaomi.com>
AuthorDate: Tue Feb 18 10:52:55 2020 +0800

    [collector] add service monitor
---
 src/kudu/collector/CMakeLists.txt     |   4 +-
 src/kudu/collector/collector.cc       |   5 +
 src/kudu/collector/collector.h        |   2 +
 src/kudu/collector/nodes_checker.cc   |   2 +-
 src/kudu/collector/service_monitor.cc | 492 ++++++++++++++++++++++++++++++++++
 src/kudu/collector/service_monitor.h  |  92 +++++++
 src/kudu/scripts/falcon_screen.json   |   6 +-
 7 files changed, 598 insertions(+), 5 deletions(-)

diff --git a/src/kudu/collector/CMakeLists.txt b/src/kudu/collector/CMakeLists.txt
index 5bbb1cb..7cdf30b 100644
--- a/src/kudu/collector/CMakeLists.txt
+++ b/src/kudu/collector/CMakeLists.txt
@@ -26,10 +26,12 @@ set(COLLECTOR_SRCS
   falcon_reporter.cc
   local_reporter.cc
   metrics_collector.cc
-  nodes_checker.cc)
+  nodes_checker.cc
+  service_monitor.cc)
 
 add_library(collector ${COLLECTOR_SRCS})
 target_link_libraries(collector
+  kudu_client
   kudu_curl_util
   kudu_tools_test_util
   log
diff --git a/src/kudu/collector/collector.cc b/src/kudu/collector/collector.cc
index 59f36df..3501300 100644
--- a/src/kudu/collector/collector.cc
+++ b/src/kudu/collector/collector.cc
@@ -29,6 +29,7 @@
 #include "kudu/collector/metrics_collector.h"
 #include "kudu/collector/nodes_checker.h"
 #include "kudu/collector/reporter_base.h"
+#include "kudu/collector/service_monitor.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/security/init.h"
 #include "kudu/util/env.h"
@@ -91,6 +92,8 @@ Status Collector::Init() {
   CHECK_OK(metrics_collector_->Init());
   cluster_rebalancer_.reset(new ClusterRebalancer());
   CHECK_OK(cluster_rebalancer_->Init());
+  service_monitor_.reset(new ServiceMonitor(reporter_));
+  CHECK_OK(service_monitor_->Init());
 
   initialized_ = true;
   return Status::OK();
@@ -107,6 +110,7 @@ Status Collector::Start() {
   nodes_checker_->Start();
   metrics_collector_->Start();
   cluster_rebalancer_->Start();
+  service_monitor_->Start();
 
   return Status::OK();
 }
@@ -120,6 +124,7 @@ void Collector::Shutdown() {
     metrics_collector_->Shutdown();
     nodes_checker_->Shutdown();
     cluster_rebalancer_->Shutdown();
+    service_monitor_->Shutdown();
 
     stop_background_threads_latch_.CountDown();
 
diff --git a/src/kudu/collector/collector.h b/src/kudu/collector/collector.h
index e135a3c..12abaf2 100644
--- a/src/kudu/collector/collector.h
+++ b/src/kudu/collector/collector.h
@@ -37,6 +37,7 @@ class ClusterRebalancer;
 class MetricsCollector;
 class NodesChecker;
 class ReporterBase;
+class ServiceMonitor;
 
 class Collector {
  public:
@@ -64,6 +65,7 @@ class Collector {
   scoped_refptr<MetricsCollector> metrics_collector_;
   scoped_refptr<NodesChecker> nodes_checker_;
   scoped_refptr<ClusterRebalancer> cluster_rebalancer_;
+  scoped_refptr<ServiceMonitor> service_monitor_;
 
   CountDownLatch stop_background_threads_latch_;
   scoped_refptr<Thread> excess_log_deleter_thread_;
diff --git a/src/kudu/collector/nodes_checker.cc b/src/kudu/collector/nodes_checker.cc
index b2dacd8..036d72a 100644
--- a/src/kudu/collector/nodes_checker.cc
+++ b/src/kudu/collector/nodes_checker.cc
@@ -142,7 +142,7 @@ void NodesChecker::NodesCheckerThread() {
     UpdateAndCheckNodes();
     check_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec);
   } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(check_time));
-  LOG(INFO) << "FalconPusherThread exit";
+  LOG(INFO) << "NodesCheckerThread exit";
 }
 
 void NodesChecker::UpdateAndCheckNodes() {
diff --git a/src/kudu/collector/service_monitor.cc b/src/kudu/collector/service_monitor.cc
new file mode 100644
index 0000000..a92d309
--- /dev/null
+++ b/src/kudu/collector/service_monitor.cc
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/service_monitor.h"
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <ostream>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/scan_predicate.h"
+#include "kudu/client/value.h"
+#include "kudu/client/write_op.h"
+#include "kudu/collector/collector_util.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+DEFINE_string(collector_monitor_table_name, "system.monitor",
+              "Table name of monitor table.");
+DEFINE_uint32(collector_check_monitor_table_interval_sec, 3600,
+              "Number of interval seconds to check monitor table.");
+DEFINE_uint32(collector_monitor_avg_record_count_per_tablet, 100,
+              "Average record count for each tablet.");
+DEFINE_uint32(collector_monitor_avg_tablets_count_on_each_node, 10,
+              "Number of tablets of monitor table on each tablet server.");
+DEFINE_uint32(collector_monitor_timeout_threshold_sec, 30,
+              "If operations for checkintg service and record the result "
+              "take more than this number of seconds, "
+              "issue a warning with a trace.");
+DEFINE_uint32(collector_monitor_upsert_timeout_ms, 100,
+              "Timeout for one upsert operation");
+
+DECLARE_string(collector_cluster_name);
+DECLARE_string(collector_master_addrs);
+DECLARE_uint32(collector_interval_sec);
+DECLARE_uint32(collector_warn_threshold_ms);
+
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduInsert;
+using kudu::client::KuduPredicate;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::KuduTablet;
+using kudu::client::KuduTabletServer;
+using kudu::client::KuduUpsert;
+using kudu::client::KuduValue;
+using kudu::client::sp::shared_ptr;
+using kudu::KuduPartialRow;
+
+using std::list;
+using std::string;
+using std::vector;
+using std::unique_ptr;
+using std::unordered_map;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+ServiceMonitor::ServiceMonitor(scoped_refptr<ReporterBase> reporter)
+  : initialized_(false),
+    reporter_(std::move(reporter)),
+    stop_background_threads_latch_(1) {
+}
+
+ServiceMonitor::~ServiceMonitor() {
+  Shutdown();
+}
+
+Status ServiceMonitor::Init() {
+  CHECK(!initialized_);
+
+  RETURN_NOT_OK(InitCilent());
+  CHECK(client_);
+
+  last_check_table_time_ = MonoTime::Now();
+  RETURN_NOT_OK(CheckMonitorTable());
+
+  initialized_ = true;
+  return Status::OK();
+}
+
+Status ServiceMonitor::Start() {
+  CHECK(initialized_);
+
+  RETURN_NOT_OK(StartServiceMonitorThread());
+
+  return Status::OK();
+}
+
+void ServiceMonitor::Shutdown() {
+  if (initialized_) {
+    string name = ToString();
+    LOG(INFO) << name << " shutting down...";
+
+    stop_background_threads_latch_.CountDown();
+
+    if (service_monitor_thread_) {
+      service_monitor_thread_->Join();
+    }
+
+    LOG(INFO) << name << " shutdown complete.";
+  }
+}
+
+string ServiceMonitor::ToString() const {
+  return "ServiceMonitor";
+}
+
+Status ServiceMonitor::InitCilent() {
+  CHECK(client_.get() == nullptr);
+  const vector<string>& master_addresses =
+      Split(FLAGS_collector_master_addrs, ",", strings::SkipEmpty());
+  return KuduClientBuilder()
+      .master_server_addrs(master_addresses)
+      .Build(&client_);
+}
+
+KuduSchema ServiceMonitor::CreateTableSchema() {
+  KuduSchema schema;
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
+  b.AddColumn("value")->Type(KuduColumnSchema::INT64);
+  b.AddColumn("total_count")->Type(KuduColumnSchema::INT32);
+  b.AddColumn("success_count")->Type(KuduColumnSchema::INT32);
+  CHECK_OK(b.Build(&schema));
+  return schema;
+}
+
+Status ServiceMonitor::CreateMonitorTable(const string& table_name) {
+  vector<KuduTabletServer*> servers;
+  ElementDeleter deleter(&servers);
+  RETURN_NOT_OK(client_->ListTabletServers(&servers));
+  int num_tablets = servers.size() * FLAGS_collector_monitor_avg_tablets_count_on_each_node;
+
+  KuduSchema schema(CreateTableSchema());
+  vector<string> hash_keys = {"key"};
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(table_name)
+      .schema(&schema)
+      .add_hash_partitions(hash_keys, num_tablets)
+      .num_replicas(3);
+  RETURN_NOT_OK(table_creator->Create());
+  LOG(INFO) << Substitute("Created table $0.", table_name);
+  return Status::OK();
+}
+
+Status ServiceMonitor::CheckMonitorTable() {
+  string table_name = FLAGS_collector_monitor_table_name;
+  LOG(INFO) << Substitute("Checking monitor table $0.", table_name);
+  bool exist = false;
+  RETURN_NOT_OK(client_->TableExists(table_name, &exist));
+  if (!exist) {
+    RETURN_NOT_OK(CreateMonitorTable(table_name));
+  }
+
+  // Check monitor table's schema.
+  KuduSchema schema;
+  RETURN_NOT_OK(client_->GetTableSchema(table_name, &schema));
+  if (!schema.Equals(CreateTableSchema())) {
+    LOG(FATAL) << Substitute("$0 table $0 has an incorrect schema.", table_name);
+  }
+
+  // Check if monitor table's tablet count matches the cluster's node count.
+  shared_ptr<KuduTable> table;
+  CHECK_OK(client_->OpenTable(table_name, &table));
+  vector<KuduScanToken*> tokens;
+  ElementDeleter token_deleter(&tokens);
+  KuduScanTokenBuilder builder(table.get());
+  RETURN_NOT_OK(builder.Build(&tokens));
+  int replica_count = tokens.size() * table->num_replicas();
+  vector<KuduTabletServer*> servers;
+  ElementDeleter deleter(&servers);
+  RETURN_NOT_OK(client_->ListTabletServers(&servers));
+  if (replica_count < servers.size()) {
+    LOG(FATAL) <<
+        Substitute("$0 table's replica count doesn't match cluster's node count.", table_name);
+  }
+
+  // Check if all tablet servers at least has one leader replica running on it.
+  unordered_map<string, vector<string>> ts_tablets;
+  unordered_map<string, int> ts_leader_replica_count;
+  for (const auto* token : tokens) {
+    const auto& tablet =  token->tablet();
+    for (const auto* replica : tablet.replicas()) {
+      string ts = replica->ts().uuid();
+      if (replica->is_leader()) {
+        EmplaceIfNotPresent(&ts_leader_replica_count, ts, 0);
+        auto& leader_count = FindOrDie(ts_leader_replica_count, ts);
+        leader_count++;
+      }
+      EmplaceIfNotPresent(&ts_tablets, ts, vector<string>());
+      auto& tablets = FindOrDie(ts_tablets, ts);
+      tablets.emplace_back(tablet.id());
+    }
+  }
+  for (const auto* server : servers) {
+    const string& ts_uuid = server->uuid();
+    if (ContainsKey(ts_leader_replica_count, ts_uuid)) {
+      auto& leader_replica_count = FindOrDie(ts_leader_replica_count, ts_uuid);
+      LOG(INFO) << Substitute("TS $0 has $1 leader replicas on it",
+                              ts_uuid, leader_replica_count);
+      continue;
+    }
+    if (!ContainsKey(ts_tablets, ts_uuid)) {
+      LOG(WARNING) << Substitute("TS $0 has no replica running on it", ts_uuid);
+      RETURN_NOT_OK(RebalanceMonitorTable());
+      RETURN_NOT_OK(CheckMonitorTable());
+      return Status::OK();
+    }
+
+    LOG(WARNING) << Substitute("TS $0 has no leader replica running on it", ts_uuid);
+    const auto& tablets = FindOrDie(ts_tablets, ts_uuid);
+    string leader_step_down_tablet =
+        FindLeaderStepDownTablet(ts_leader_replica_count,
+                                 tablets,
+                                 FLAGS_collector_monitor_avg_tablets_count_on_each_node);
+    if (!leader_step_down_tablet.empty()) {
+      RETURN_NOT_OK(CallLeaderStepDown(leader_step_down_tablet, ts_uuid));
+      continue;
+    }
+    leader_step_down_tablet = FindLeaderStepDownTablet(ts_leader_replica_count,tablets, 1);
+    if (!leader_step_down_tablet.empty()) {
+      RETURN_NOT_OK(CallLeaderStepDown(leader_step_down_tablet, ts_uuid));
+      continue;
+    }
+    LOG(FATAL) << Substitute(
+        "Unable to call leader_step_down for replicas on ts $0, "
+        "set a larger number for 'collector_monitor_avg_tablets_count_on_each_node' ", ts_uuid);
+  }
+
+  return Status::OK();
+}
+
+Status ServiceMonitor::RebalanceMonitorTable() {
+  vector<string> args = {
+    "cluster",
+    "rebalance",
+    FLAGS_collector_master_addrs,
+    "--tables=" + FLAGS_collector_monitor_table_name
+  };
+  string tool_stdout;
+  string tool_stderr;
+  RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+                        Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+  LOG(INFO) << std::endl
+            << tool_stdout;
+  return Status::OK();
+}
+
+string ServiceMonitor::FindLeaderStepDownTablet(
+    const unordered_map<string, int>& ts_leader_replica_count,
+    const vector<string>& tablets,
+    int least_num_of_leader_replicas) {
+  string leader_step_down_tablet;
+  for (const auto& tablet : tablets) {
+    string leader_host_uuid;
+    Status s = GetLeaderHost(tablet, &leader_host_uuid);
+    if (!s.ok()) {
+      LOG(WARNING) << s.ToString();
+      continue;
+    }
+    CHECK(!leader_host_uuid.empty());
+    auto& leader_replica_count = FindOrDie(ts_leader_replica_count, leader_host_uuid);
+    if (leader_replica_count > least_num_of_leader_replicas) {
+      leader_step_down_tablet = tablet;
+      break;
+    }
+  }
+  return leader_step_down_tablet;
+}
+
+Status ServiceMonitor::GetLeaderHost(const string& tablet_id, string* leader_host) {
+  KuduTablet* tablet_raw = nullptr;
+  RETURN_NOT_OK(client_->GetTablet(tablet_id, &tablet_raw));
+  unique_ptr<KuduTablet> tablet(tablet_raw);
+  for (const auto* r : tablet->replicas()) {
+    if (r->is_leader()) {
+      *leader_host = r->ts().uuid();
+      return Status::OK();
+    }
+  }
+  return Status::NotFound(Substitute("No leader replica found for tablet $0", tablet_id));
+}
+
+Status ServiceMonitor::CallLeaderStepDown(const string& tablet_id, const string& ts_uuid) {
+  vector<string> args = {
+    "tablet",
+    "leader_step_down",
+    FLAGS_collector_master_addrs,
+    tablet_id,
+    "--new_leader_uuid=" + ts_uuid
+  };
+  string tool_stdout;
+  string tool_stderr;
+  RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+                        Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+  LOG(INFO) << std::endl
+            << tool_stdout;
+  return Status::OK();
+}
+
+Status ServiceMonitor::StartServiceMonitorThread() {
+  return Thread::Create("collector", "nodes-checker", &ServiceMonitor::ServiceMonitorThread,
+                        this, &service_monitor_thread_);
+}
+
+void ServiceMonitor::ServiceMonitorThread() {
+  MonoTime check_time;
+  do {
+    check_time = MonoTime::Now();
+    CheckService();
+    check_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec);
+  } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(check_time));
+  LOG(INFO) << "ServiceMonitorThread exit";
+}
+
+void ServiceMonitor::CheckService() {
+  int32_t elapsed_seconds = (MonoTime::Now() - last_check_table_time_).ToSeconds();
+  if (elapsed_seconds >= FLAGS_collector_check_monitor_table_interval_sec) {
+    last_check_table_time_ = MonoTime::Now();
+    WARN_NOT_OK(CheckMonitorTable(), "Unable to check monitor table");
+  }
+
+  LOG(INFO) << "Start to CheckService";
+  MonoTime start(MonoTime::Now());
+  scoped_refptr<Trace> trace(new Trace);
+  ADOPT_TRACE(trace.get());
+  TRACE_EVENT0("collector", "ServiceMonitor::CheckService");
+  TRACE("init");
+  bool exist = false;
+  CHECK_OK(client_->TableExists(FLAGS_collector_monitor_table_name, &exist));
+  if (!exist) {
+    WARN_NOT_OK(CheckMonitorTable(), "Unable to check monitor table");
+  }
+  shared_ptr<KuduTable> table;
+  CHECK_OK(client_->OpenTable(FLAGS_collector_monitor_table_name, &table));
+
+  WARN_NOT_OK(UpsertAndScanRows(table), "Unable to upsert and scan some rows");
+
+  int64_t elapsed_sec = (MonoTime::Now() - start).ToSeconds();
+  if (elapsed_sec > FLAGS_collector_monitor_timeout_threshold_sec) {
+    if (Trace::CurrentTrace()) {
+      LOG(WARNING) << "Trace:" << std::endl
+                   << Trace::CurrentTrace()->DumpToString();
+    }
+  }
+}
+
+Status ServiceMonitor::UpsertAndScanRows(const shared_ptr<KuduTable>& table) {
+  shared_ptr<KuduSession> session = table->client()->NewSession();
+  RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+  session->SetTimeoutMillis(FLAGS_collector_monitor_upsert_timeout_ms);
+  vector<KuduScanToken*> tokens;
+  ElementDeleter deleter(&tokens);
+  KuduScanTokenBuilder builder(table.get());
+  RETURN_NOT_OK(builder.Build(&tokens));
+  int record_count = tokens.size() * FLAGS_collector_monitor_avg_record_count_per_tablet;
+  int64_t timestamp = static_cast<uint64_t>(WallTime_Now());
+
+  // Check if we can upsert some rows.
+  int write_success = 0;
+  MonoTime start(MonoTime::Now());
+  for (int i = 0; i < record_count; i++) {
+    KuduUpsert* upsert = table->NewUpsert();
+    KuduPartialRow* row = upsert->mutable_row();
+    RETURN_NOT_OK(row->SetInt64("key", i));
+    RETURN_NOT_OK(row->SetInt64("value", timestamp));
+    Status s = session->Apply(upsert);
+    if (s.ok()) {
+      write_success++;
+    } else {
+      LOG(WARNING) << s.ToString() <<  Substitute(": unable to upsert row (id=$0).", i);
+    }
+  }
+  int64_t write_latency_ms = (MonoTime::Now() - start).ToMilliseconds();
+  TRACE("Upsert some rows");
+  if (write_success != record_count) {
+    LOG(WARNING) << Substitute("Expect to upsert $0 rows, actually upsert $1 rows.",
+                                record_count, write_success);
+  }
+
+  // Check if rows upserted
+  KuduScanner scanner(table.get());
+  RETURN_NOT_OK(scanner.SetFaultTolerant());
+  // Add a predicate: WHERE key >= 0
+  KuduPredicate* p = table->NewComparisonPredicate(
+      "key", KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(0));
+  RETURN_NOT_OK(scanner.AddConjunctPredicate(p));
+  // Add a predicate: WHERE key < record_count
+  p = table->NewComparisonPredicate(
+      "key", KuduPredicate::LESS, KuduValue::FromInt(record_count));
+  RETURN_NOT_OK(scanner.AddConjunctPredicate(p));
+  RETURN_NOT_OK(scanner.Open());
+
+  int read_success = 0;
+  start = MonoTime::Now();
+  KuduScanBatch batch;
+  while (scanner.HasMoreRows()) {
+    RETURN_NOT_OK(scanner.NextBatch(&batch));
+    for (KuduScanBatch::const_iterator it = batch.begin(); it != batch.end(); ++it) {
+      KuduScanBatch::RowPtr row(*it);
+      int64_t val;
+      RETURN_NOT_OK(row.GetInt64("value", &val));
+      if (val == timestamp) {
+        read_success++;
+      }
+    }
+  }
+  int64_t scan_latency_ms = (MonoTime::Now() - start).ToMilliseconds();
+  TRACE("Scan some rows");
+  if (read_success != write_success) {
+    LOG(WARNING) << Substitute("Expect to get $0 rows, actually get $1 rows.",
+                                write_success, read_success);
+  }
+
+  double total_count = record_count* 2;
+  double success_count = write_success + read_success;
+  double kudu_success = success_count/total_count*100;
+
+  KuduInsert* insert = table->NewInsert();
+  KuduPartialRow* row = insert->mutable_row();
+  RETURN_NOT_OK(row->SetInt64("key", timestamp));
+  RETURN_NOT_OK(row->SetInt32("total_count", total_count));
+  RETURN_NOT_OK(row->SetInt32("success_count", success_count));
+  RETURN_NOT_OK(session->Apply(insert));
+  RETURN_NOT_OK(session->Close());
+
+  unordered_map<string, int64_t> report_metrics;
+  report_metrics.emplace("kudu.scanLatency", scan_latency_ms);
+  report_metrics.emplace("kudu.writeLatency", write_latency_ms);
+  report_metrics.emplace("kudu.success", kudu_success);
+  list<scoped_refptr<ItemBase>> items;
+  for (const auto& elem : report_metrics) {
+    items.emplace_back(reporter_->ConstructItem(
+      FLAGS_collector_cluster_name,
+      elem.first,
+      "cluster",
+      timestamp,
+      elem.second,
+      "GAUGE",
+      ""));
+  }
+  reporter_->PushItems(std::move(items));
+  TRACE("Pushed results");
+
+  return Status::OK();
+}
+
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/service_monitor.h b/src/kudu/collector/service_monitor.h
new file mode 100644
index 0000000..ab1515a
--- /dev/null
+++ b/src/kudu/collector/service_monitor.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Thread;
+
+namespace client {
+class KuduClient;
+class KuduTable;
+} // namespace client
+
+namespace collector {
+
+class ReporterBase;
+
+class ServiceMonitor : public RefCounted<ServiceMonitor> {
+ public:
+  explicit ServiceMonitor(scoped_refptr<ReporterBase> reporter);
+  ~ServiceMonitor();
+
+  Status Init();
+  Status Start();
+  void Shutdown();
+
+  std::string ToString() const;
+
+ private:
+  friend class RefCounted<ServiceMonitor>;
+
+  Status StartServiceMonitorThread();
+  void ServiceMonitorThread();
+  void CheckService();
+
+  client::KuduSchema CreateTableSchema();
+
+  Status CallLeaderStepDown(const std::string& tablet_id, const std::string& ts_uuid);
+  Status CheckMonitorTable();
+  Status CreateMonitorTable(const std::string& table_name);
+  Status InitCilent();
+  Status RebalanceMonitorTable();
+  Status UpsertAndScanRows(const client::sp::shared_ptr<client::KuduTable>& table);
+
+  // Find a tablet from 'tablets', whose leader replica host has n leader replicas,
+  // n > least_num_of_leader_replicas.
+  std::string FindLeaderStepDownTablet(
+      const std::unordered_map<std::string, int>& ts_leader_replica_count,
+      const std::vector<std::string>& tablets,
+      int least_num_of_leader_replicas);
+
+  // Get leader host uuid for a given tablet id.
+  Status GetLeaderHost(const std::string& tablet_id, std::string* leader_host);
+
+  bool initialized_;
+
+  client::sp::shared_ptr<client::KuduClient> client_;
+  scoped_refptr<ReporterBase> reporter_;
+  CountDownLatch stop_background_threads_latch_;
+  scoped_refptr<Thread> service_monitor_thread_;
+  MonoTime last_check_table_time_;
+
+  DISALLOW_COPY_AND_ASSIGN(ServiceMonitor);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/scripts/falcon_screen.json b/src/kudu/scripts/falcon_screen.json
index 68911d9..e3ae54c 100644
--- a/src/kudu/scripts/falcon_screen.json
+++ b/src/kudu/scripts/falcon_screen.json
@@ -213,9 +213,9 @@
         "metric=write_transactions_inflight service=kudu cluster=${cluster.name} level=${level} v=4"
       ],
       "cluster" : [
-          "metric=kudu.success service=kudu level=${level}",
-          "metric=kudu.writeLatency service=kudu level=${level}",
-          "metric=kudu.scanLatency service=kudu level=${level}",
+          "metric=kudu.success service=kudu cluster=${cluster.name} level=${level} v=4",
+          "metric=kudu.writeLatency service=kudu cluster=${cluster.name} level=${level} v=4",
+          "metric=kudu.scanLatency service=kudu cluster=${cluster.name} level=${level} v=4",
           "metric=healthy_table_proportion service=kudu cluster=${cluster.name} level=${level} v=4"
       ],
       "cluster_stat" : [