You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2020/08/15 14:59:35 UTC

[kudu] 07/23: [collector] Add CPP implemented collector

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to tag kudu-1.12.0-mdh1.0.0-4c2c075-centos-release
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit d9b29d901bcbabd1210064f84d73ece9931baff2
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Wed Jun 19 16:20:06 2019 +0800

    [collector] Add CPP implemented collector
    
    Change-Id: I498347605a09e832d3398e76d9cefd3e52b1cfc6
---
 CMakeLists.txt                                |   1 +
 src/kudu/collector/CMakeLists.txt             |  63 ++
 src/kudu/collector/cluster_rebalancer-test.cc |  50 ++
 src/kudu/collector/cluster_rebalancer.cc      | 152 +++++
 src/kudu/collector/cluster_rebalancer.h       |  68 ++
 src/kudu/collector/collector-test.cc          |  44 ++
 src/kudu/collector/collector.cc               | 170 +++++
 src/kudu/collector/collector.h                |  74 +++
 src/kudu/collector/collector_main.cc          |  73 +++
 src/kudu/collector/collector_util-test.cc     |  33 +
 src/kudu/collector/collector_util.cc          |  46 ++
 src/kudu/collector/collector_util.h           |  32 +
 src/kudu/collector/falcon_reporter-test.cc    | 122 ++++
 src/kudu/collector/falcon_reporter.cc         | 255 ++++++++
 src/kudu/collector/falcon_reporter.h          | 108 ++++
 src/kudu/collector/local_reporter.cc          |  84 +++
 src/kudu/collector/local_reporter.h           |  58 ++
 src/kudu/collector/metrics_collector-test.cc  | 777 +++++++++++++++++++++++
 src/kudu/collector/metrics_collector.cc       | 852 ++++++++++++++++++++++++++
 src/kudu/collector/metrics_collector.h        | 205 +++++++
 src/kudu/collector/nodes_checker-test.cc      |  55 ++
 src/kudu/collector/nodes_checker.cc           | 358 +++++++++++
 src/kudu/collector/nodes_checker.h            |  90 +++
 src/kudu/collector/reporter_base.h            |  73 +++
 src/kudu/master/catalog_manager.cc            |   1 +
 src/kudu/tools/tool_action_table.cc           |   2 -
 src/kudu/util/jsonreader.h                    |   6 +
 27 files changed, 3850 insertions(+), 2 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index d2bb441..1797da9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1423,6 +1423,7 @@ add_subdirectory(src/kudu/cfile)
 add_subdirectory(src/kudu/client)
 add_subdirectory(src/kudu/clock)
 add_subdirectory(src/kudu/codegen)
+add_subdirectory(src/kudu/collector)
 add_subdirectory(src/kudu/common)
 add_subdirectory(src/kudu/consensus)
 add_subdirectory(src/kudu/experiments)
diff --git a/src/kudu/collector/CMakeLists.txt b/src/kudu/collector/CMakeLists.txt
new file mode 100644
index 0000000..5bbb1cb
--- /dev/null
+++ b/src/kudu/collector/CMakeLists.txt
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#########################################
+# collector
+#########################################
+
+set(COLLECTOR_SRCS
+  cluster_rebalancer.cc
+  collector.cc
+  collector_util.cc
+  falcon_reporter.cc
+  local_reporter.cc
+  metrics_collector.cc
+  nodes_checker.cc)
+
+add_library(collector ${COLLECTOR_SRCS})
+target_link_libraries(collector
+  kudu_curl_util
+  kudu_tools_test_util
+  log
+  security
+  server_process)
+
+#########################################
+# kudu-collector
+#########################################
+
+add_executable(kudu-collector collector_main.cc)
+target_link_libraries(kudu-collector
+  ${SANITIZER_OPTIONS_OVERRIDE}
+  ${KRB5_REALM_OVERRIDE}
+  collector
+  ${KUDU_BASE_LIBS})
+
+option(KUDU_COLLECTOR_INSTALL "Whether to install the Kudu Collector executable" ON)
+if(KUDU_COLLECTOR_INSTALL)
+  install(TARGETS kudu-collector RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR})
+else()
+  message(STATUS "Skipping install rule for the Kudu Collector executable")
+endif()
+
+SET_KUDU_TEST_LINK_LIBS(collector)
+ADD_KUDU_TEST(collector-test)
+ADD_KUDU_TEST(collector_util-test)
+ADD_KUDU_TEST(cluster_rebalancer-test)
+ADD_KUDU_TEST(falcon_reporter-test)
+ADD_KUDU_TEST(metrics_collector-test)
+ADD_KUDU_TEST(nodes_checker-test)
diff --git a/src/kudu/collector/cluster_rebalancer-test.cc b/src/kudu/collector/cluster_rebalancer-test.cc
new file mode 100644
index 0000000..0a1e0a7
--- /dev/null
+++ b/src/kudu/collector/cluster_rebalancer-test.cc
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/cluster_rebalancer.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+namespace collector {
+
+TEST(TestClusterRebalancer, TestValidateHMTime) {
+  // 'time' in error format.
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:34:56").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("1:23").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:3").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime(":3").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12.34").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("-1:30").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("24:30").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:-1").IsInvalidArgument());
+  ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:60").IsInvalidArgument());
+
+  // 'time' in valid format.
+  ASSERT_OK(ClusterRebalancer::ValidateHMTime("12:34"));
+  ASSERT_OK(ClusterRebalancer::ValidateHMTime("00:00"));
+  ASSERT_OK(ClusterRebalancer::ValidateHMTime("00:59"));
+  ASSERT_OK(ClusterRebalancer::ValidateHMTime("23:00"));
+  ASSERT_OK(ClusterRebalancer::ValidateHMTime("23:59"));
+}
+}  // namespace collector
+}  // namespace kudu
+
diff --git a/src/kudu/collector/cluster_rebalancer.cc b/src/kudu/collector/cluster_rebalancer.cc
new file mode 100644
index 0000000..0015544
--- /dev/null
+++ b/src/kudu/collector/cluster_rebalancer.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/cluster_rebalancer.h"
+
+#include <stdio.h>
+#include <time.h>
+
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/collector/collector_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_bool(auto_rebalance, true, "Whether to rebalance cluster automatically");
+DEFINE_string(rebalance_time, "00:00",
+              "Time to perform cluster rebalance, format in HH:MM");
+
+DECLARE_string(collector_cluster_name);
+DECLARE_string(collector_master_addrs);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+ClusterRebalancer::ClusterRebalancer()
+  : initialized_(false),
+    stop_background_threads_latch_(1) {
+}
+
+ClusterRebalancer::~ClusterRebalancer() {
+  Shutdown();
+}
+
+Status ClusterRebalancer::Init() {
+  CHECK(!initialized_);
+
+  RETURN_NOT_OK(ValidateHMTime(FLAGS_rebalance_time));
+
+  initialized_ = true;
+  return Status::OK();
+}
+
+Status ClusterRebalancer::Start() {
+  CHECK(initialized_);
+
+  if (!FLAGS_auto_rebalance) {
+    return Status::OK();
+  }
+
+  RETURN_NOT_OK(StartClusterRebalancerThread());
+
+  return Status::OK();
+}
+
+void ClusterRebalancer::Shutdown() {
+  if (initialized_) {
+    string name = ToString();
+    LOG(INFO) << name << " shutting down...";
+
+    stop_background_threads_latch_.CountDown();
+
+    if (cluster_rebalancer_thread_) {
+      cluster_rebalancer_thread_->Join();
+    }
+
+    LOG(INFO) << name << " shutdown complete.";
+  }
+}
+
+string ClusterRebalancer::ToString() const {
+  return "ClusterRebalancer";
+}
+
+Status ClusterRebalancer::StartClusterRebalancerThread() {
+  return Thread::Create("server", "cluster-rebalancer", &ClusterRebalancer::ClusterRebalancerThread,
+                        this, &cluster_rebalancer_thread_);
+}
+
+void ClusterRebalancer::ClusterRebalancerThread() {
+  const MonoDelta kWait = MonoDelta::FromSeconds(60);
+  while (!RunOnceMode() && !stop_background_threads_latch_.WaitFor(kWait)) {
+    string dst;
+    StringAppendStrftime(&dst, "%H:%M", time(nullptr), true);
+    if (dst == FLAGS_rebalance_time) {
+      WARN_NOT_OK(RebalanceCluster(), "Unable to rebalance cluster");
+    }
+  }
+}
+
+Status ClusterRebalancer::RebalanceCluster() {
+  vector<string> args = {
+    "cluster",
+    "rebalance",
+    FLAGS_collector_master_addrs
+  };
+  string tool_stdout;
+  string tool_stderr;
+  RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+                        Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+  LOG(INFO) << std::endl
+            << tool_stdout;
+  return Status::OK();
+}
+
+Status ClusterRebalancer::ValidateHMTime(const string& time) {
+  Status err = Status::InvalidArgument(
+      Substitute("Invalid time format '$0', should in format 'HH:MM'", time));
+  if (time.size() != 5) {
+    return err;
+  }
+
+  int hour, minute;
+  int count = sscanf(time.c_str(), "%d:%d", &hour, &minute);
+  if (count == 2 &&
+      0 <= hour && hour < 24 &&
+      0 <= minute && minute < 60) {
+    return Status::OK();
+  }
+
+  return err;
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/cluster_rebalancer.h b/src/kudu/collector/cluster_rebalancer.h
new file mode 100644
index 0000000..ed08736
--- /dev/null
+++ b/src/kudu/collector/cluster_rebalancer.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Thread;
+}  // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class ClusterRebalancer : public RefCounted<ClusterRebalancer> {
+ public:
+  ClusterRebalancer();
+  ~ClusterRebalancer();
+
+  Status Init();
+  Status Start();
+  void Shutdown();
+
+  std::string ToString() const;
+
+ private:
+  friend class RefCounted<ClusterRebalancer>;
+
+  FRIEND_TEST(TestClusterRebalancer, TestValidateHMTime);
+
+  // Start thread to rebalance cluster.
+  Status StartClusterRebalancerThread();
+  void ClusterRebalancerThread();
+  static Status RebalanceCluster();
+
+  static Status ValidateHMTime(const std::string& time);
+
+  bool initialized_;
+
+  CountDownLatch stop_background_threads_latch_;
+
+  scoped_refptr<Thread> cluster_rebalancer_thread_;
+
+  DISALLOW_COPY_AND_ASSIGN(ClusterRebalancer);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector-test.cc b/src/kudu/collector/collector-test.cc
new file mode 100644
index 0000000..e8e1298
--- /dev/null
+++ b/src/kudu/collector/collector-test.cc
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/collector.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+namespace collector {
+
+TEST(TestCollector, TestValidateIntervalAndTimeout) {
+  // 'interval' in error range.
+  ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(9, 1).IsInvalidArgument());
+  ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(61, 1).IsInvalidArgument());
+
+  // 'timeout' in error range.
+  ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(10, 0).IsInvalidArgument());
+  ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(10, 10).IsInvalidArgument());
+
+  // Both 'interval' and 'timeout' are in valid range.
+  ASSERT_OK(Collector::ValidateIntervalAndTimeout(10, 9));
+  ASSERT_OK(Collector::ValidateIntervalAndTimeout(60, 9));
+  ASSERT_OK(Collector::ValidateIntervalAndTimeout(60, 59));
+}
+}  // namespace collector
+}  // namespace kudu
+
diff --git a/src/kudu/collector/collector.cc b/src/kudu/collector/collector.cc
new file mode 100644
index 0000000..1c930e9
--- /dev/null
+++ b/src/kudu/collector/collector.cc
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/collector.h"
+
+#include <ostream>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/collector/cluster_rebalancer.h"
+#include "kudu/collector/falcon_reporter.h"
+#include "kudu/collector/local_reporter.h"
+#include "kudu/collector/metrics_collector.h"
+#include "kudu/collector/nodes_checker.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/init.h"
+#include "kudu/util/env.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_string(collector_cluster_name, "",
+              "Cluster name of this collector to operate");
+DEFINE_string(collector_master_addrs, "",
+              "Comma-separated list of Kudu master addresses where each address is of "
+              "form 'hostname:port");
+DEFINE_int32(collector_interval_sec, 60,
+             "Number of interval seconds to collect metrics");
+DEFINE_string(collector_report_method, "",
+              "Which monitor system the metrics reported to. Now supported system: falcon");
+DEFINE_int32(collector_timeout_sec, 10,
+             "Number of seconds to wait for a master, tserver, or CLI tool to return metrics");
+DEFINE_int32(collector_warn_threshold_ms, 1000,
+             "If a task takes more than this number of milliseconds, issue a warning with a "
+             "trace.");
+
+DECLARE_string(principal);
+DECLARE_string(keytab_file);
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+Collector::Collector()
+  : initialized_(false),
+    stop_background_threads_latch_(1) {
+}
+
+Collector::~Collector() {
+  Shutdown();
+}
+
+Status Collector::Init() {
+  CHECK(!initialized_);
+
+  RETURN_NOT_OK(ValidateIntervalAndTimeout(FLAGS_collector_interval_sec,
+                                           FLAGS_collector_timeout_sec));
+  RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
+
+  if (FLAGS_collector_report_method == "falcon") {
+    reporter_.reset(new FalconReporter());
+  } else if (FLAGS_collector_report_method == "local") {
+    reporter_.reset(new LocalReporter());
+  } else {
+    LOG(FATAL) << Substitute("Unsupported FLAGS_collector_report_method $0",
+                             FLAGS_collector_report_method);
+  }
+  CHECK_OK(reporter_->Init());
+  nodes_checker_.reset(new NodesChecker(reporter_));
+  CHECK_OK(nodes_checker_->Init());
+  metrics_collector_.reset(new MetricsCollector(nodes_checker_, reporter_));
+  CHECK_OK(metrics_collector_->Init());
+  cluster_rebalancer_.reset(new ClusterRebalancer());
+  CHECK_OK(cluster_rebalancer_->Init());
+
+  initialized_ = true;
+  return Status::OK();
+}
+
+Status Collector::Start() {
+  CHECK(initialized_);
+
+  google::FlushLogFiles(google::INFO); // Flush the startup messages.
+
+  RETURN_NOT_OK(StartExcessLogFileDeleterThread());
+
+  reporter_->Start();
+  nodes_checker_->Start();
+  metrics_collector_->Start();
+  cluster_rebalancer_->Start();
+
+  return Status::OK();
+}
+
+void Collector::Shutdown() {
+  if (initialized_) {
+    string name = ToString();
+    LOG(INFO) << name << " shutting down...";
+
+    reporter_->Shutdown();
+    metrics_collector_->Shutdown();
+    nodes_checker_->Shutdown();
+    cluster_rebalancer_->Shutdown();
+
+    stop_background_threads_latch_.CountDown();
+
+    if (excess_log_deleter_thread_) {
+      excess_log_deleter_thread_->Join();
+    }
+
+    LOG(INFO) << name << " shutdown complete.";
+  }
+}
+
+string Collector::ToString() const {
+  return "Collector";
+}
+
+Status Collector::StartExcessLogFileDeleterThread() {
+  // Try synchronously deleting excess log files once at startup to make sure it
+  // works, then start a background thread to continue deleting them in the
+  // future.
+  if (!FLAGS_logtostderr) {
+    RETURN_NOT_OK_PREPEND(DeleteExcessLogFiles(Env::Default()),
+                          "Unable to delete excess log files");
+  }
+  return Thread::Create("server", "excess-log-deleter", &Collector::ExcessLogFileDeleterThread,
+                        this, &excess_log_deleter_thread_);
+}
+
+void Collector::ExcessLogFileDeleterThread() {
+  // How often to attempt to clean up excess glog files.
+  const MonoDelta kWait = MonoDelta::FromSeconds(60);
+  while (!stop_background_threads_latch_.WaitFor(kWait)) {
+    WARN_NOT_OK(DeleteExcessLogFiles(Env::Default()), "Unable to delete excess log files");
+  }
+}
+
+Status Collector::ValidateIntervalAndTimeout(int interval, int timeout) {
+  if (10 <= interval && interval <= 60 &&
+      0 < timeout && timeout < interval) {
+    return Status::OK();
+  }
+
+  return Status::InvalidArgument(
+      Substitute("Invalid interval '$0'(should in range [10, 60]), "
+                 "or invalid timeout '$1'(should in range (0, interval))", interval, timeout));
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector.h b/src/kudu/collector/collector.h
new file mode 100644
index 0000000..8e4e236
--- /dev/null
+++ b/src/kudu/collector/collector.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Thread;
+}  // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class ClusterRebalancer;
+class MetricsCollector;
+class NodesChecker;
+class ReporterBase;
+
+class Collector {
+ public:
+  Collector();
+  ~Collector();
+
+  Status Init();
+  Status Start();
+  void Shutdown();
+
+  std::string ToString() const;
+
+ private:
+  FRIEND_TEST(TestCollector, TestValidateIntervalAndTimeout);
+
+  // Start thread to remove excess glog files.
+  Status StartExcessLogFileDeleterThread();
+  void ExcessLogFileDeleterThread();
+
+  static Status ValidateIntervalAndTimeout(int interval, int timeout);
+
+  bool initialized_;
+
+  scoped_refptr<ReporterBase> reporter_;
+  scoped_refptr<MetricsCollector> metrics_collector_;
+  scoped_refptr<NodesChecker> nodes_checker_;
+  scoped_refptr<ClusterRebalancer> cluster_rebalancer_;
+
+  CountDownLatch stop_background_threads_latch_;
+  scoped_refptr<Thread> excess_log_deleter_thread_;
+
+  DISALLOW_COPY_AND_ASSIGN(Collector);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector_main.cc b/src/kudu/collector/collector_main.cc
new file mode 100644
index 0000000..afc0768
--- /dev/null
+++ b/src/kudu/collector/collector_main.cc
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/collector/collector.h"
+#include "kudu/collector/collector_util.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/init.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/version_info.h"
+
+namespace kudu {
+namespace collector {
+
+static int CollectorMain(int argc, char** argv) {
+  InitKuduOrDie();
+
+  GFlagsMap default_flags = GetFlagsMap();
+
+  ParseCommandLineFlags(&argc, &argv, true);
+  if (argc != 1) {
+    std::cerr << "usage: " << argv[0] << std::endl;
+    return 1;
+  }
+  std::string nondefault_flags = GetNonDefaultFlags(default_flags);
+  InitGoogleLoggingSafe(argv[0]);
+
+  LOG(INFO) << "Collector non-default flags:\n"
+            << nondefault_flags << '\n'
+            << "Collector version:\n"
+            << VersionInfo::GetAllVersionInfo();
+
+  Collector collector;
+  LOG(INFO) << "Initializing collector...";
+  CHECK_OK(collector.Init());
+
+  LOG(INFO) << "Starting collector...";
+  CHECK_OK(collector.Start());
+
+  LOG(INFO) << "Collector successfully started.";
+  while (!RunOnceMode()) {
+    SleepFor(MonoDelta::FromSeconds(60));
+  }
+
+  return 0;
+}
+
+} // namespace collector
+} // namespace kudu
+
+int main(int argc, char** argv) {
+  return kudu::collector::CollectorMain(argc, argv);
+}
diff --git a/src/kudu/collector/collector_util-test.cc b/src/kudu/collector/collector_util-test.cc
new file mode 100644
index 0000000..8b29c56
--- /dev/null
+++ b/src/kudu/collector/collector_util-test.cc
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/collector_util.h"
+
+#include <gtest/gtest.h>
+
+namespace kudu {
+namespace collector {
+
+TEST(TestCollectorUtil, TestExtractHostName) {
+  ASSERT_EQ(ExtractHostName("1.2.3.4:5555"), "1.2.3.4");
+  ASSERT_EQ(ExtractHostName("host-name.bj:5555"), "host-name.bj");
+  ASSERT_EQ(ExtractHostName("1.2.3.4"), "1.2.3.4");
+  ASSERT_EQ(ExtractHostName("host-name.bj"), "host-name.bj");
+}
+}  // namespace collector
+}  // namespace kudu
+
diff --git a/src/kudu/collector/collector_util.cc b/src/kudu/collector/collector_util.cc
new file mode 100644
index 0000000..aa79c40
--- /dev/null
+++ b/src/kudu/collector/collector_util.cc
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions for generating data for use by tools and tests.
+
+#include "kudu/collector/collector_util.h"
+
+#include <stddef.h>
+
+#include <gflags/gflags_declare.h>
+
+DECLARE_string(collector_report_method);
+
+using std::string;
+
+namespace kudu {
+namespace collector {
+
+string ExtractHostName(const string& url) {
+  size_t pos = url.find(':');
+  if (pos == string::npos) {
+    return url;
+  }
+  return url.substr(0, pos);
+}
+
+bool RunOnceMode() {
+  static bool run_once = (FLAGS_collector_report_method == "local");
+  return run_once;
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector_util.h b/src/kudu/collector/collector_util.h
new file mode 100644
index 0000000..f9badc8
--- /dev/null
+++ b/src/kudu/collector/collector_util.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions for generating data for use by tools and tests.
+
+#pragma once
+
+#include <string>
+
+namespace kudu {
+namespace collector {
+
+std::string ExtractHostName(const std::string& url);
+
+bool RunOnceMode();
+
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/falcon_reporter-test.cc b/src/kudu/collector/falcon_reporter-test.cc
new file mode 100644
index 0000000..85810c7
--- /dev/null
+++ b/src/kudu/collector/falcon_reporter-test.cc
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/falcon_reporter.h"
+
+#include <list>
+#include <string>
+#include <utility>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/test_macros.h"
+
+DECLARE_string(collector_cluster_name);
+DECLARE_int32(collector_falcon_metrics_version);
+DECLARE_int32(collector_interval_sec);
+
+using std::list;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+TEST(TestFalconReporter, TestSerializeItems) {
+  FLAGS_collector_interval_sec = 30;
+  FLAGS_collector_cluster_name = "test";
+  FLAGS_collector_falcon_metrics_version = 8;
+  scoped_refptr<FalconReporter> reporter(new FalconReporter());
+  list<scoped_refptr<ItemBase>> falcon_items;
+  string data;
+  ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data));
+  ASSERT_EQ(data, "");
+
+  falcon_items.emplace_back(reporter->ConstructItem(
+    "tserver1",
+    "scan_count",
+    "host",
+    1234567890,
+    12345,
+    "COUNTER",
+    ""));
+  ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data));
+  ASSERT_EQ(data, Substitute(
+                  R"*([{"endpoint":"tserver1","metric":"scan_count","timestamp":1234567890,)*"
+                  R"*("step":$0,"value":12345,"counterType":"COUNTER",)*"
+                  R"*("tags":"service=kudu,cluster=$1,level=host,v=$2"}])*",
+                  FLAGS_collector_interval_sec,
+                  FLAGS_collector_cluster_name,
+                  FLAGS_collector_falcon_metrics_version));
+
+  falcon_items.emplace_back(reporter->ConstructItem(
+    "table1",
+    "disk_size",
+    "table",
+    1234567891,
+    67890,
+    "GAUGE",
+    ""));
+  ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data));
+  ASSERT_EQ(data, Substitute(
+                  R"*([{"endpoint":"tserver1","metric":"scan_count","timestamp":1234567890,)*"
+                  R"*("step":$0,"value":12345,"counterType":"COUNTER",)*"
+                  R"*("tags":"service=kudu,cluster=$1,level=host,v=$2"},)*"
+                  R"*({"endpoint":"table1","metric":"disk_size","timestamp":1234567891,)*"
+                  R"*("step":$0,"value":67890,"counterType":"GAUGE",)*"
+                  R"*("tags":"service=kudu,cluster=$1,level=table,v=$2"}])*",
+                  FLAGS_collector_interval_sec,
+                  FLAGS_collector_cluster_name,
+                  FLAGS_collector_falcon_metrics_version));
+}
+
+void GenerateItems(const scoped_refptr<FalconReporter>& reporter, int count) {
+  list<scoped_refptr<ItemBase>> items;
+  for (int i = 0; i < count; ++i) {
+    items.emplace_back(reporter->ConstructItem("endpoint", "metric", "level", 0, i, "GAUGE", ""));
+  }
+  reporter->PushItems(std::move(items));
+}
+
+TEST(TestFalconReporter, TestPushAndPopItems) {
+  scoped_refptr<FalconReporter> reporter(new FalconReporter());
+  ASSERT_FALSE(reporter->HasItems());
+  NO_FATALS(GenerateItems(reporter, 1));
+  ASSERT_TRUE(reporter->HasItems());
+  NO_FATALS(GenerateItems(reporter, 9));
+  ASSERT_TRUE(reporter->HasItems());
+
+  list<scoped_refptr<ItemBase>> falcon_items;
+  reporter->PopItems(&falcon_items);
+  ASSERT_FALSE(reporter->HasItems());
+  ASSERT_EQ(falcon_items.size(), 10);
+
+  NO_FATALS(GenerateItems(reporter, 5));
+  ASSERT_TRUE(reporter->HasItems());
+
+  list<scoped_refptr<ItemBase>> falcon_items2;
+  reporter->PopItems(&falcon_items2);
+  ASSERT_FALSE(reporter->HasItems());
+  ASSERT_EQ(falcon_items2.size(), 5);
+}
+}  // namespace collector
+}  // namespace kudu
+
diff --git a/src/kudu/collector/falcon_reporter.cc b/src/kudu/collector/falcon_reporter.cc
new file mode 100644
index 0000000..e492fd7
--- /dev/null
+++ b/src/kudu/collector/falcon_reporter.cc
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/falcon_reporter.h"
+
+#include <kudu/util/curl_util.h>
+#include <stddef.h>
+
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+DEFINE_bool(collector_direct_push, false,
+            "Whether to push collected items to falcon agent directly, "
+            "otherwise items will be cached and then pushed to falcon "
+            "agent asynchronous");
+DEFINE_string(collector_falcon_agent, "http://127.0.0.1:1988/v1/push",
+              "The falcon agent URL to push metrics to");
+DEFINE_int32(collector_falcon_metrics_version, 4,
+             "Version of metrics pushed to falcon, it will be tagged in "
+             "'tag' section of an item");
+DEFINE_int32(collector_falcon_pusher_count, 4,
+             "Thread count to push collected items to falcon agent");
+DEFINE_int32(collector_report_batch_size, 1000,
+            "Count of items will be pushed to falcon agent by batch");
+DEFINE_int32(collector_push_timeout_ms, 20,
+             "Timeout for pushing items to falcon agent");
+
+DECLARE_string(collector_cluster_name);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+DECLARE_int32(collector_warn_threshold_ms);
+
+using std::list;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+FalconReporter::FalconReporter()
+  : initialized_(false),
+    stop_background_threads_latch_(1) {
+}
+
+FalconReporter::~FalconReporter() {
+  Shutdown();
+}
+
+Status FalconReporter::Init() {
+  CHECK(!initialized_);
+
+  // Simple test falcon agent.
+  EasyCurl curl;
+  faststring dst;
+  RETURN_NOT_OK(curl.PostToURL(FLAGS_collector_falcon_agent, "", &dst));
+
+  initialized_ = true;
+  return Status::OK();
+}
+
+Status FalconReporter::Start() {
+  CHECK(initialized_);
+
+  if (!FLAGS_collector_direct_push) {
+    RETURN_NOT_OK(StartFalconPusherThreadPool());
+  }
+
+  return Status::OK();
+}
+
+void FalconReporter::Shutdown() {
+  if (initialized_) {
+    string name = ToString();
+    LOG(INFO) << name << " shutting down...";
+
+    stop_background_threads_latch_.CountDown();
+
+    pusher_thread_pool_->Wait();
+
+    LOG(INFO) << name << " shutdown complete.";
+  }
+}
+
+string FalconReporter::ToString() const {
+  return "FalconReporter";
+}
+
+scoped_refptr<ItemBase> FalconReporter::ConstructItem(string endpoint,
+                                                      string metric,
+                                                      string level,
+                                                      uint64_t timestamp,
+                                                      int64_t value,
+                                                      string counter_type,
+                                                      string extra_tags) {
+  scoped_refptr<ItemBase> tmp(new FalconItem(std::move(endpoint),
+                        std::move(metric),
+                        Substitute("service=kudu,cluster=$0,level=$1,v=$2$3",
+                                   FLAGS_collector_cluster_name, level,
+                                   FLAGS_collector_falcon_metrics_version,
+                                   extra_tags.empty() ? "" : "," + extra_tags),
+                        timestamp,
+                        FLAGS_collector_interval_sec,
+                        value,
+                        std::move(counter_type)));
+  return tmp;
+}
+
+Status FalconReporter::PushItems(list<scoped_refptr<ItemBase>> items) {
+  if (FLAGS_collector_direct_push) {
+    RETURN_NOT_OK(PushToAgent(std::move(items)));
+  } else {
+    std::lock_guard<RWMutex> l(items_lock_);
+    buffer_items_.splice(buffer_items_.end(), std::move(items));
+  }
+  return Status::OK();
+}
+
+Status FalconReporter::StartFalconPusherThreadPool() {
+  RETURN_NOT_OK(ThreadPoolBuilder("falcon-pusher")
+      .set_min_threads(FLAGS_collector_falcon_pusher_count)
+      .set_max_threads(FLAGS_collector_falcon_pusher_count)
+      .set_idle_timeout(MonoDelta::FromMilliseconds(1))
+      .Build(&pusher_thread_pool_));
+  for (int i = 0; i < FLAGS_collector_falcon_pusher_count; ++i) {
+    RETURN_NOT_OK(pusher_thread_pool_->SubmitFunc(std::bind(&FalconReporter::FalconPusher,
+                                                            this)));
+  }
+
+  return Status::OK();
+}
+
+void FalconReporter::FalconPusher() {
+  while (HasItems() || !stop_background_threads_latch_.WaitFor(MonoDelta::FromSeconds(1))) {
+    ReportItems();
+  }
+}
+
+void FalconReporter::ReportItems() {
+  MonoTime start(MonoTime::Now());
+  scoped_refptr<Trace> trace(new Trace);
+  ADOPT_TRACE(trace.get());
+  TRACE_EVENT0("collector", "FalconReporter::ReportItems");
+  TRACE("init");
+
+  list<scoped_refptr<ItemBase>> falcon_items;
+  PopItems(&falcon_items);
+  WARN_NOT_OK(PushToAgent(std::move(falcon_items)), "PushToAgent failed");
+  int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+  if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+    if (Trace::CurrentTrace()) {
+      LOG(WARNING) << "Trace:" << std::endl
+                   << Trace::CurrentTrace()->DumpToString();
+    }
+  }
+}
+
+bool FalconReporter::HasItems() const {
+  std::lock_guard<RWMutex> l(items_lock_);
+  return !buffer_items_.empty();
+}
+
+void FalconReporter::PopItems(list<scoped_refptr<ItemBase>>* falcon_items) {
+  int items_left = 0;
+  CHECK(falcon_items);
+  {
+    std::lock_guard<RWMutex> l(items_lock_);
+    auto end_item = buffer_items_.begin();
+    std::advance(end_item, std::min(buffer_items_.size(),
+                                    static_cast<size_t>(FLAGS_collector_report_batch_size)));
+    falcon_items->splice(falcon_items->end(), buffer_items_, buffer_items_.begin(), end_item);
+    items_left = buffer_items_.size();
+  }
+  if (items_left > 1000000) {
+    LOG(INFO) << "Items left " << items_left << std::endl;
+  }
+  TRACE(Substitute("Pop items, count $0", falcon_items->size()));
+}
+
+Status FalconReporter::PushToAgent(list<scoped_refptr<ItemBase>> falcon_items) {
+  string data;
+  RETURN_NOT_OK(SerializeItems(std::move(falcon_items), &data));
+
+  EasyCurl curl;
+  faststring dst;
+  curl.set_timeout(MonoDelta::FromMilliseconds(FLAGS_collector_push_timeout_ms));
+  RETURN_NOT_OK(curl.PostToURL(FLAGS_collector_falcon_agent, data, &dst));
+  TRACE(Substitute("Pushed items to agent, size $0", data.size()));
+  return Status::OK();
+}
+
+Status FalconReporter::SerializeItems(list<scoped_refptr<ItemBase>> items, string* data) {
+  CHECK(data);
+  if (items.empty()) {
+    return Status::OK();
+  }
+  std::ostringstream str;
+  JsonWriter jw(&str, JsonWriter::COMPACT);
+  jw.StartArray();
+  for (const auto& item : items) {
+    scoped_refptr<FalconItem> falcon_item = dynamic_cast<FalconItem*>(item.get());
+    jw.StartObject();
+    jw.String("endpoint");
+    jw.String(falcon_item->endpoint);
+    jw.String("metric");
+    jw.String(falcon_item->metric);
+    jw.String("timestamp");
+    jw.Uint64(falcon_item->timestamp);
+    jw.String("step");
+    jw.Int(falcon_item->step);
+    jw.String("value");
+    jw.Int64(falcon_item->value);
+    jw.String("counterType");
+    jw.String(falcon_item->counter_type);
+    jw.String("tags");
+    jw.String(falcon_item->tags);
+    jw.EndObject();
+  }
+  jw.EndArray();
+  *data = str.str();
+  TRACE(Substitute("SerializeItems done, count $0, size $1", items.size(), data->size()));
+  return Status::OK();
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/falcon_reporter.h b/src/kudu/collector/falcon_reporter.h
new file mode 100644
index 0000000..3b3cde5
--- /dev/null
+++ b/src/kudu/collector/falcon_reporter.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class ThreadPool;
+}  // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+// Open-Falcon is a distributed and high-performance monitoring system,
+// see more details http://open-falcon.com
+struct FalconItem : public ItemBase {
+  FalconItem(std::string ep, std::string m, std::string t,
+             uint64_t ts, int32_t s, int64_t v, std::string ct)
+  : endpoint(std::move(ep)),
+    metric(std::move(m)),
+    tags(std::move(t)),
+    timestamp(ts),
+    step(s),
+    value(v),
+    counter_type(std::move(ct)) {
+  }
+  ~FalconItem() override = default;
+
+  std::string endpoint;
+  std::string metric;
+  std::string tags;
+  uint64_t timestamp;
+  int32_t step;
+  int64_t value;
+  std::string counter_type;
+};
+
+class FalconReporter : public ReporterBase {
+ public:
+  FalconReporter();
+  ~FalconReporter() override;
+
+  Status Init() override;
+  Status Start() override;
+  void Shutdown() override;
+
+  std::string ToString() const override;
+
+  scoped_refptr<ItemBase> ConstructItem(std::string endpoint,
+                                        std::string metric,
+                                        std::string level,
+                                        uint64_t timestamp,
+                                        int64_t value,
+                                        std::string counter_type,
+                                        std::string extra_tags) override;
+
+  Status PushItems(std::list<scoped_refptr<ItemBase>> items) override;
+
+ private:
+  FRIEND_TEST(TestFalconReporter, TestSerializeItems);
+  FRIEND_TEST(TestFalconReporter, TestPushAndPopItems);
+
+  Status StartFalconPusherThreadPool();
+  void FalconPusher();
+
+  bool HasItems() const;
+  void ReportItems();
+  void PopItems(std::list<scoped_refptr<ItemBase>>* falcon_items);
+  static Status PushToAgent(std::list<scoped_refptr<ItemBase>> falcon_items);
+  static Status SerializeItems(std::list<scoped_refptr<ItemBase>> items, std::string* data);
+
+  bool initialized_;
+
+  CountDownLatch stop_background_threads_latch_;
+  std::unique_ptr<ThreadPool> pusher_thread_pool_;
+
+  mutable RWMutex items_lock_;
+  std::list<scoped_refptr<ItemBase>> buffer_items_;
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/local_reporter.cc b/src/kudu/collector/local_reporter.cc
new file mode 100644
index 0000000..04110bb
--- /dev/null
+++ b/src/kudu/collector/local_reporter.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/local_reporter.h"
+
+#include <iostream>
+
+#include <glog/logging.h>
+
+#include "kudu/util/status.h"
+
+using std::list;
+using std::string;
+
+namespace kudu {
+namespace collector {
+
+LocalReporter::LocalReporter()
+  : initialized_(false) {
+}
+
+LocalReporter::~LocalReporter() {
+  Shutdown();
+}
+
+Status LocalReporter::Init() {
+  CHECK(!initialized_);
+
+  initialized_ = true;
+  return Status::OK();
+}
+
+Status LocalReporter::Start() {
+  CHECK(initialized_);
+
+  return Status::OK();
+}
+
+void LocalReporter::Shutdown() {
+  if (initialized_) {
+    string name = ToString();
+    LOG(INFO) << name << " shutting down...";
+
+    LOG(INFO) << name << " shutdown complete.";
+  }
+}
+
+string LocalReporter::ToString() const {
+  return "LocalReporter";
+}
+
+scoped_refptr<ItemBase> LocalReporter::ConstructItem(string endpoint,
+                                                     string metric,
+                                                     string level,
+                                                     uint64_t /*timestamp*/,
+                                                     int64_t value,
+                                                     string /*counter_type*/,
+                                                     string extra_tags) {
+  MutexLock l(output_lock_);
+  std::cout << level << " " << metric << " " << endpoint << " "
+      << (extra_tags.empty() ? "" : extra_tags + " ") << value << std::endl;
+  return nullptr;
+}
+
+Status LocalReporter::PushItems(list<scoped_refptr<ItemBase>> /*items*/) {
+  return Status::OK();
+}
+
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/local_reporter.h b/src/kudu/collector/local_reporter.h
new file mode 100644
index 0000000..d796b3b
--- /dev/null
+++ b/src/kudu/collector/local_reporter.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <string>
+
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace collector {
+
+class LocalReporter : public ReporterBase {
+ public:
+  LocalReporter();
+  ~LocalReporter() override;
+
+  Status Init() override;
+  Status Start() override;
+  void Shutdown() override;
+
+  std::string ToString() const override;
+
+  scoped_refptr<ItemBase> ConstructItem(std::string endpoint,
+                                        std::string metric,
+                                        std::string level,
+                                        uint64_t timestamp,
+                                        int64_t value,
+                                        std::string counter_type,
+                                        std::string extra_tags) override;
+
+  Status PushItems(std::list<scoped_refptr<ItemBase>> items) override;
+
+ private:
+  bool initialized_;
+  Mutex output_lock_;
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/metrics_collector-test.cc b/src/kudu/collector/metrics_collector-test.cc
new file mode 100644
index 0000000..4a3e050
--- /dev/null
+++ b/src/kudu/collector/metrics_collector-test.cc
@@ -0,0 +1,777 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/metrics_collector.h"
+
+#include <stdint.h>
+
+#include <map>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/collector/local_reporter.h"
+#include "kudu/collector/nodes_checker.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+DECLARE_bool(collector_request_merged_metrics);
+DECLARE_string(collector_attributes);
+DECLARE_string(collector_cluster_level_metrics);
+DECLARE_string(collector_metrics);
+DECLARE_string(collector_table_names);
+DECLARE_string(collector_metrics_types_for_test);
+
+using std::map;
+using std::set;
+using std::string;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace collector {
+
+scoped_refptr<MetricsCollector> BuildCollector() {
+  scoped_refptr<ReporterBase> reporter(new LocalReporter());
+  scoped_refptr<NodesChecker> nodes_checker(new NodesChecker(reporter));
+  return new MetricsCollector(nodes_checker, reporter);
+}
+
+TEST(TestMetricsCollector, TestConvertStateToInt) {
+  int64_t result = 1;
+  ASSERT_OK(MetricsCollector::ConvertStateToInt("", &result));
+  ASSERT_EQ(result, 0);
+  ASSERT_OK(MetricsCollector::ConvertStateToInt("STOPPED", &result));
+  ASSERT_EQ(result, 0);
+  ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGSTOPPED", &result));
+  ASSERT_EQ(result, 0);
+  ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGBOOTSTRAPPING", &result));
+  ASSERT_EQ(result, 0);
+  ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNING", &result));
+  ASSERT_EQ(result, 1);
+  ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGRUNNING", &result));
+  ASSERT_EQ(result, 1);
+}
+
+TEST(TestMetricsCollector, TestGetHistValue) {
+  {
+    vector<MetricsCollector::SimpleHistogram> hist_values({{10, 100}});
+    ASSERT_EQ(MetricsCollector::GetHistValue(hist_values), 100);
+  }
+  {
+    vector<MetricsCollector::SimpleHistogram> hist_values({{10, 100},
+                                                           {20, 200}});
+    ASSERT_EQ(MetricsCollector::GetHistValue(hist_values), 167);
+  }
+}
+
+TEST(TestMetricsCollector, TestMergeToTableLevelMetrics) {
+  // Merge empty metrics.
+  {
+    vector<MetricsCollector::TablesMetrics> hosts_tables_metrics;
+    vector<MetricsCollector::TablesHistMetrics> hosts_tables_hist_metrics;
+    MetricsCollector::TablesMetrics tables_metrics;
+    MetricsCollector::TablesHistMetrics tables_hist_metrics;
+    ASSERT_OK(MetricsCollector::MergeToTableLevelMetrics(
+      hosts_tables_metrics, hosts_tables_hist_metrics,
+      &tables_metrics, &tables_hist_metrics));
+    ASSERT_TRUE(tables_metrics.empty());
+    ASSERT_TRUE(tables_hist_metrics.empty());
+  }
+  // Merge multi metrics.
+  {
+    vector<MetricsCollector::TablesMetrics> hosts_tables_metrics{
+        {  // host-1
+          {
+            "table1",
+            {
+              {"metric1", 1},
+              {"metric2", 2}
+            }
+          },
+          {
+            "table2",
+            {
+              {"metric1", 100},
+              {"metric3", 200}
+            }
+          }
+        },
+        {  // host-2
+          {
+            "table1",
+            {
+              {"metric1", 100},
+              {"metric2", 200}
+            }
+          },
+          {
+            "table2",
+            {
+              {"metric1", 1},
+              {"metric2", 2}
+            }
+          },
+          {
+            "table3",
+            {
+              {"metric1", 1},
+              {"metric2", 2}
+            }
+          }
+        }
+    };
+    vector<MetricsCollector::TablesHistMetrics> hosts_tables_hist_metrics{
+        {  // host-1
+          {
+            "table1",
+            {
+              {
+                "metric3",
+                {
+                  {10, 100},
+                  {20, 200}
+                }
+              },
+              {
+                "metric4",
+                {
+                  {30, 300},
+                  {40, 400}
+                }
+              }
+            }
+          },
+          {
+            "table2",
+            {
+              {
+                "metric3",
+                {
+                  {10, 200},
+                  {20, 300}
+                }
+              },
+              {
+                "metric4",
+                {
+                  {40, 300},
+                  {50, 400}
+                }
+              }
+            }
+          }
+        },
+        {  // host-2
+          {
+            "table1",
+            {
+              {
+                "metric3",
+                {
+                  {10, 100},
+                  {20, 200}
+                }
+              },
+              {
+                "metric4",
+                {
+                  {30, 300},
+                  {40, 400}
+                }
+              }
+            }
+          },
+          {
+            "table2",
+            {
+              {
+                "metric3",
+                {
+                  {10, 200},
+                  {20, 300}
+                }
+              },
+              {
+                "metric4",
+                {
+                  {40, 300},
+                  {50, 400}
+                }
+              }
+            }
+          },
+          {
+            "table3",
+            {
+              {
+                "metric3",
+                {
+                  {10, 200},
+                  {20, 300}
+                }
+              },
+              {
+                "metric4",
+                {
+                  {40, 300},
+                  {50, 400}
+                }
+              }
+            }
+          }
+        }
+    };
+    MetricsCollector::TablesMetrics tables_metrics;
+    MetricsCollector::TablesHistMetrics tables_hist_metrics;
+    ASSERT_OK(MetricsCollector::MergeToTableLevelMetrics(
+        hosts_tables_metrics, hosts_tables_hist_metrics,
+        &tables_metrics, &tables_hist_metrics));
+    ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({
+        {
+          "table1",
+          {
+            {"metric1", 101},
+            {"metric2", 202}
+          }
+        },
+        {
+          "table2",
+          {
+            {"metric1", 101},
+            {"metric2", 2},
+            {"metric3", 200},
+          }
+        },
+        {
+          "table3",
+          {
+            {"metric1", 1},
+            {"metric2", 2}
+          }
+        }
+    }));
+    ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({
+        {
+          "table1",
+          {
+            {
+              "metric3",
+              {
+                {10, 100},
+                {20, 200},
+                {10, 100},
+                {20, 200}
+              }
+            },
+            {
+              "metric4",
+              {
+                {30, 300},
+                {40, 400},
+                {30, 300},
+                {40, 400}
+              }
+            }
+          }
+        },
+        {
+          "table2",
+          {
+            {
+              "metric3",
+              {
+                {10, 200},
+                {20, 300},
+                {10, 200},
+                {20, 300}
+              }
+            },
+            {
+              "metric4",
+              {
+                {40, 300},
+                {50, 400},
+                {40, 300},
+                {50, 400}
+              }
+            }
+          }
+        },
+        {
+          "table3",
+          {
+            {
+              "metric3",
+              {
+                {10, 200},
+                {20, 300}
+              }
+            },
+            {
+              "metric4",
+              {
+                {40, 300},
+                {50, 400}
+              }
+            }
+          }
+        }
+    }));
+  }
+}
+
+TEST(TestMetricsCollector, TestMergeToClusterLevelMetrics) {
+  // Merge empty metrics.
+  {
+    MetricsCollector::TablesMetrics tables_metrics;
+    MetricsCollector::TablesHistMetrics tables_hist_metrics;
+    MetricsCollector::Metrics cluster_metrics;
+    ASSERT_OK(MetricsCollector::MergeToClusterLevelMetrics(tables_metrics, tables_hist_metrics,
+                                                           &cluster_metrics));
+    ASSERT_TRUE(cluster_metrics.empty());
+  }
+  // Merge multi metrics.
+  {
+    MetricsCollector::TablesMetrics tables_metrics(
+        {
+          {
+            "table1",
+            {
+              {"metric1", 100}
+            }
+          },
+          {
+            "table2",
+            {
+              {"metric1", 10},
+              {"metric2", 20}
+            }
+          },
+          {
+            "table3",
+            {
+              {"metric1", 1},
+              {"metric2", 2},
+              {"metric3", 3}
+            }
+          }
+        }
+    );
+    MetricsCollector::TablesHistMetrics tables_hist_metrics;  // TODO(yingchun) not used now.
+    MetricsCollector::Metrics cluster_metrics({{"metric2", 0}});
+    ASSERT_OK(MetricsCollector::MergeToClusterLevelMetrics(tables_metrics, tables_hist_metrics,
+                                                           &cluster_metrics));
+    ASSERT_EQ(cluster_metrics, MetricsCollector::Metrics({
+        {
+          {"metric2", 22}
+        }
+    }));
+  }
+}
+
+TEST(TestMetricsCollector, TestParseMetrics) {
+  // Check ParseServerMetrics and ParseTabletMetrics.
+  {
+    string data;
+    JsonReader r(data);
+    const rapidjson::Value entity;
+    ASSERT_TRUE(MetricsCollector::ParseServerMetrics(r, &entity).IsNotSupported());
+    ASSERT_TRUE(MetricsCollector::ParseTabletMetrics(r, &entity).IsNotSupported());
+  }
+  // Check ParseTableMetrics.
+  {
+    auto collector = BuildCollector();
+    collector->metric_types_by_entity_type_["tablet"] = {
+        {"test_metric", "COUNTER"},
+        {"metric_counter1", "COUNTER"},
+        {"metric_counter2", "COUNTER"},
+        {"metric_histogram1", "HISTOGRAM"},
+        {"metric_histogram2", "HISTOGRAM"}
+    };
+    string data(
+        R"*([                                             )*"
+        R"*(  {                                           )*"
+        R"*(    "type": "server",                         )*"
+        R"*(    "id": "server1",                          )*"
+        R"*(    "attributes": {                           )*"
+        R"*(      "attrA": "val1",                        )*"
+        R"*(      "attrB": "val2"                         )*"
+        R"*(    },                                        )*"
+        R"*(    "metrics": [                              )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "test_metric",                )*"
+        R"*(        "value": 123                          )*"
+        R"*(      }                                       )*"
+        R"*(    ]                                         )*"
+        R"*(  },                                          )*"
+        R"*(  {                                           )*"
+        R"*(    "type": "tablet",                         )*"
+        R"*(    "id": "tablet1",                          )*"
+        R"*(    "attributes": {                           )*"
+        R"*(      "attr1": "val1",                        )*"
+        R"*(      "attr2": "val2"                         )*"
+        R"*(    },                                        )*"
+        R"*(    "metrics": [                              )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "test_metric",                )*"
+        R"*(        "value": 321                          )*"
+        R"*(      }                                       )*"
+        R"*(    ]                                         )*"
+        R"*(  },                                          )*"
+        R"*(  {                                           )*"
+        R"*(    "type": "table",                          )*"
+        R"*(    "id": "table1",                           )*"
+        R"*(    "attributes": {                           )*"
+        R"*(      "attr1": "val2",                        )*"
+        R"*(      "attr2": "val3"                         )*"
+        R"*(    },                                        )*"
+        R"*(    "metrics": [                              )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "metric_counter1",            )*"
+        R"*(        "value": 10                           )*"
+        R"*(      },                                      )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "metric_counter2",            )*"
+        R"*(        "value": 20                           )*"
+        R"*(      },                                      )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "metric_histogram1",          )*"
+        R"*(        "total_count": 17,                    )*"
+        R"*(        "min": 6,                             )*"
+        R"*(        "mean": 47.8235,                      )*"
+        R"*(        "percentile_75": 62,                  )*"
+        R"*(        "percentile_95": 72,                  )*"
+        R"*(        "percentile_99": 73,                  )*"
+        R"*(        "percentile_99_9": 73,                )*"
+        R"*(        "percentile_99_99": 73,               )*"
+        R"*(        "max": 73,                            )*"
+        R"*(        "total_sum": 813                      )*"
+        R"*(      }                                       )*"
+        R"*(    ]                                         )*"
+        R"*(  },                                          )*"
+        R"*(  {                                           )*"
+        R"*(    "type": "table",                          )*"
+        R"*(    "id": "table2",                           )*"
+        R"*(    "attributes": {                           )*"
+        R"*(      "attr1": "val3",                        )*"
+        R"*(      "attr2": "val2"                         )*"
+        R"*(    },                                        )*"
+        R"*(    "metrics": [                              )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "metric_counter1",            )*"
+        R"*(        "value": 100                          )*"
+        R"*(      },                                      )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "metric_histogram1",          )*"
+        R"*(        "total_count": 170,                   )*"
+        R"*(        "min": 60,                            )*"
+        R"*(        "mean": 478.235,                      )*"
+        R"*(        "percentile_75": 620,                 )*"
+        R"*(        "percentile_95": 720,                 )*"
+        R"*(        "percentile_99": 730,                 )*"
+        R"*(        "percentile_99_9": 735,               )*"
+        R"*(        "percentile_99_99": 735,              )*"
+        R"*(        "max": 735,                           )*"
+        R"*(        "total_sum": 8130                     )*"
+        R"*(      },                                      )*"
+        R"*(      {                                       )*"
+        R"*(        "name": "metric_histogram2",          )*"
+        R"*(        "total_count": 34,                    )*"
+        R"*(        "min": 6,                             )*"
+        R"*(        "mean": 47.8235,                      )*"
+        R"*(        "percentile_75": 62,                  )*"
+        R"*(        "percentile_95": 72,                  )*"
+        R"*(        "percentile_99": 72,                  )*"
+        R"*(        "percentile_99_9": 73,                )*"
+        R"*(        "percentile_99_99": 73,               )*"
+        R"*(        "max": 73,                            )*"
+        R"*(        "total_sum": 813                      )*"
+        R"*(      }                                       )*"
+        R"*(    ]                                         )*"
+        R"*(  }                                           )*"
+        R"*(]                                             )*");
+
+    // Attribute filter is empty.
+    {
+      MetricsCollector::TablesMetrics tables_metrics;
+      MetricsCollector::TablesHistMetrics tables_hist_metrics;
+      MetricsCollector::Metrics host_metrics;
+      MetricsCollector::HistMetrics host_hist_metrics;
+      ASSERT_OK(collector->ParseMetrics(data,
+                                       &tables_metrics, &host_metrics,
+                                       &tables_hist_metrics, &host_hist_metrics));
+      ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({
+          {
+            "table1",
+            {
+              {"metric_counter1", 10},
+              {"metric_counter2", 20},
+            }
+          },
+          {
+            "table2",
+            {
+              {"metric_counter1", 100}
+            }
+          }
+      }));
+      ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({
+          {
+            "table1",
+            {
+              {
+                "metric_histogram1_percentile_99",
+                {
+                  {17, 73}
+                }
+              }
+            }
+          },
+          {
+            "table2",
+            {
+              {
+                "metric_histogram1_percentile_99",
+                {
+                  {170, 730}
+                }
+              },
+              {
+                "metric_histogram2_percentile_99",
+                {
+                  {34, 72}
+                }
+              }
+            }
+          }
+      }));
+      ASSERT_EQ(host_metrics, MetricsCollector::Metrics({
+          {"metric_counter1", 110},
+          {"metric_counter2", 20}
+      }));
+      ASSERT_EQ(host_hist_metrics, MetricsCollector::HistMetrics({
+          {
+            "metric_histogram1_percentile_99",
+            {
+              {17, 73},
+              {170, 730}
+            }
+          },
+          {
+            "metric_histogram2_percentile_99",
+            {
+              {34, 72}
+            }
+          }
+      }));
+    }
+
+    // Attribute filter is not empty.
+    {
+      collector->attributes_filter_ = {{"attr1", {"val1", "val2"}}};
+
+      MetricsCollector::TablesMetrics tables_metrics;
+      MetricsCollector::TablesHistMetrics tables_hist_metrics;
+      MetricsCollector::Metrics host_metrics;
+      MetricsCollector::HistMetrics host_hist_metrics;
+      ASSERT_OK(collector->ParseMetrics(data,
+                                       &tables_metrics, &host_metrics,
+                                       &tables_hist_metrics, &host_hist_metrics));
+      ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({
+          {
+            "table1",
+            {
+              {"metric_counter1", 10},
+              {"metric_counter2", 20},
+            }
+          }
+      }));
+      ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({
+          {
+            "table1",
+            {
+              {
+                "metric_histogram1_percentile_99",
+                {
+                  {17, 73}
+                }
+              }
+            }
+          }
+      }));
+      ASSERT_EQ(host_metrics, MetricsCollector::Metrics({
+          {"metric_counter1", 10},
+          {"metric_counter2", 20}
+      }));
+      ASSERT_EQ(host_hist_metrics, MetricsCollector::HistMetrics({
+          {
+            "metric_histogram1_percentile_99",
+            {
+              {17, 73},
+            }
+          }
+      }));
+    }
+  }
+}
+
+TEST(TestMetricsCollector, TestInitMetrics) {
+  FLAGS_collector_metrics_types_for_test =
+      R"*([                                                       )*"
+      R"*(  {                                                     )*"
+      R"*(    "type": "tablet",                                   )*"
+      R"*(    "id": "table1",                                     )*"
+      R"*(    "metrics": [                                        )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "counter_metric1",                      )*"
+      R"*(        "type": "counter"                               )*"
+      R"*(      },                                                )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "histogram_metric1",                    )*"
+      R"*(        "type": "histogram"                             )*"
+      R"*(      },                                                )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "gauge_metric1",                        )*"
+      R"*(        "type": "gauge"                                 )*"
+      R"*(      }                                                 )*"
+      R"*(    ]                                                   )*"
+      R"*(  },                                                    )*"
+      R"*(  {                                                     )*"
+      R"*(    "type": "tablet",                                   )*"
+      R"*(    "id": "table2",                                     )*"
+      R"*(    "metrics": [                                        )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "counter_metric1",                      )*"
+      R"*(        "type": "counter"                               )*"
+      R"*(      },                                                )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "histogram_metric1",                    )*"
+      R"*(        "type": "histogram"                             )*"
+      R"*(      },                                                )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "gauge_metric1",                        )*"
+      R"*(        "type": "gauge"                                 )*"
+      R"*(      }                                                 )*"
+      R"*(    ]                                                   )*"
+      R"*(  },                                                    )*"
+      R"*(  {                                                     )*"
+      R"*(    "type": "server",                                   )*"
+      R"*(    "metrics": [                                        )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "counter_metric2",                      )*"
+      R"*(        "type": "counter"                               )*"
+      R"*(      },                                                )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "histogram_metric2",                    )*"
+      R"*(        "type": "histogram"                             )*"
+      R"*(      },                                                )*"
+      R"*(      {                                                 )*"
+      R"*(        "name": "gauge_metric2",                        )*"
+      R"*(        "type": "gauge"                                 )*"
+      R"*(      }                                                 )*"
+      R"*(    ]                                                   )*"
+      R"*(  }                                                     )*"
+      R"*(]                                                       )*";
+  auto collector = BuildCollector();
+  ASSERT_OK(collector->InitMetrics());
+  map<string, MetricsCollector::MetricTypes> expect_metric_types({
+      {
+        "tablet",
+        {
+          {"counter_metric1", "COUNTER"},
+          {"histogram_metric1", "HISTOGRAM"},
+          {"gauge_metric1", "GAUGE"},
+        }
+      },
+      {
+        "server",
+        {
+          {"counter_metric2", "COUNTER"},
+          {"histogram_metric2", "HISTOGRAM"},
+          {"gauge_metric2", "GAUGE"},
+        }
+      }
+  });
+  ASSERT_EQ(collector->metric_types_by_entity_type_, expect_metric_types);
+}
+
+TEST(TestMetricsCollector, TestInitFilters) {
+  FLAGS_collector_attributes = "attr1:val1,val2;attr2:val1";
+  auto collector = BuildCollector();
+  ASSERT_OK(collector->InitFilters());
+  unordered_map<string, set<string>> expect_attributes_filter({
+      {
+        "attr1",
+        {"val1", "val2"}
+      },
+      {
+        "attr2",
+        {"val1"}
+      }
+  });
+  ASSERT_EQ(collector->attributes_filter_, expect_attributes_filter);
+}
+
+#define CHECK_URL_PARAMETERS(metrics, request_merged, attributes, table_names, expect_url)        \
+do {                                                                                              \
+  FLAGS_collector_metrics = metrics;                                                              \
+  FLAGS_collector_request_merged_metrics = request_merged;                                        \
+  FLAGS_collector_attributes = attributes;                                                        \
+  FLAGS_collector_table_names = table_names;                                                      \
+  auto collector = BuildCollector();                                                              \
+  ASSERT_OK(collector->InitFilters());                                                            \
+  ASSERT_OK(collector->InitMetricsUrlParameters());                                               \
+  ASSERT_EQ(collector->metric_url_parameters_, expect_url);                                       \
+} while (false)
+
+TEST(TestMetricsCollector, TestInitMetricsUrlParameters) {
+  CHECK_URL_PARAMETERS("", true, "", "",
+      "/metrics?compact=1&origin=false&merge=true");
+  CHECK_URL_PARAMETERS("m1,m2,m3", true, "", "",
+      "/metrics?compact=1&metrics=m1,m2,m3&origin=false&merge=true");
+  // TODO(yingchun): now FLAGS_collector_request_merged_metrics must be true
+  //CHECK_URL_PARAMETERS("", false, "", "",
+  //    "/metrics?compact=1");
+  CHECK_URL_PARAMETERS("", true, "attr1:a1,a2;attr2:a3", "",
+      "/metrics?compact=1&origin=false&merge=true&attributes=attr2,a3,attr1,a1,attr1,a2,");
+  CHECK_URL_PARAMETERS("", true, "", "t1,t2,t3",
+      "/metrics?compact=1&origin=false&merge=true&table_names=t1,t2,t3");
+}
+
+TEST(TestMetricsCollector, TestInitClusterLevelMetrics) {
+  FLAGS_collector_cluster_level_metrics = "m1,m2,m3";
+  auto collector = BuildCollector();
+  ASSERT_OK(collector->InitClusterLevelMetrics());
+  MetricsCollector::Metrics cluster_metrics({
+      {"m1", 0},
+      {"m2", 0},
+      {"m3", 0},
+  });
+  ASSERT_EQ(collector->cluster_metrics_, cluster_metrics);
+}
+}  // namespace collector
+}  // namespace kudu
+
diff --git a/src/kudu/collector/metrics_collector.cc b/src/kudu/collector/metrics_collector.cc
new file mode 100644
index 0000000..e5adfcb
--- /dev/null
+++ b/src/kudu/collector/metrics_collector.cc
@@ -0,0 +1,852 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/metrics_collector.h"
+
+#include <string.h>
+
+#include <cmath>
+#include <functional>
+#include <list>
+#include <ostream>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <rapidjson/rapidjson.h>
+
+#include "kudu/collector/collector_util.h"
+#include "kudu/collector/nodes_checker.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/zlib.h"
+
+DEFINE_string(collector_attributes, "",
+              "Entity attributes to collect (semicolon-separated list of entity attribute "
+              "name and values). e.g. attr_name1:attr_val1,attr_val2;attr_name2:attr_val3");
+DEFINE_string(collector_cluster_level_metrics, "on_disk_size,on_disk_data_size",
+              "Metric names which should be merged and pushed to cluster level view "
+              "(comma-separated list of metric names)");
+DEFINE_bool(collector_ignore_hosttable_level_metrics, false,
+            "Whether to ignore to report host-table level metrics.");
+DEFINE_string(collector_metrics, "",
+              "Metrics to collect (comma-separated list of metric names)");
+DEFINE_string(collector_metrics_types_for_test, "",
+              "Only for test, used to initialize metric_types_by_entity_type_");
+DEFINE_bool(collector_request_merged_metrics, true,
+            "Whether to request merged metrics and exclude unmerged metrics from server");
+DEFINE_string(collector_table_names, "",
+              "Table names to collect (comma-separated list of table names)");
+
+DECLARE_string(collector_cluster_name);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+DECLARE_int32(collector_warn_threshold_ms);
+
+using rapidjson::Value;
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::vector;
+using std::unordered_map;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+const set<string> MetricsCollector::kRegisterPercentiles = {"percentile_99"};
+
+MetricsCollector::MetricsCollector(scoped_refptr<NodesChecker> nodes_checker,
+                                   scoped_refptr<ReporterBase> reporter)
+  : initialized_(false),
+    nodes_checker_(std::move(nodes_checker)),
+    reporter_(std::move(reporter)),
+    stop_background_threads_latch_(1) {
+}
+
+MetricsCollector::~MetricsCollector() {
+  Shutdown();
+}
+
+Status MetricsCollector::Init() {
+  CHECK(!initialized_);
+
+  RETURN_NOT_OK(ValidateTableFilter(FLAGS_collector_attributes, FLAGS_collector_table_names));
+  RETURN_NOT_OK(InitMetrics());
+  RETURN_NOT_OK(InitFilters());
+  CHECK(attributes_filter_.empty());  // TODO(yingchun) disable now
+  RETURN_NOT_OK(InitMetricsUrlParameters());
+  RETURN_NOT_OK(InitClusterLevelMetrics());
+
+  initialized_ = true;
+  return Status::OK();
+}
+
+Status MetricsCollector::Start() {
+  CHECK(initialized_);
+
+  RETURN_NOT_OK(StartMetricCollectorThread());
+
+  return Status::OK();
+}
+
+void MetricsCollector::Shutdown() {
+  if (initialized_) {
+    string name = ToString();
+    LOG(INFO) << name << " shutting down...";
+
+    stop_background_threads_latch_.CountDown();
+
+    if (metric_collector_thread_) {
+      metric_collector_thread_->Join();
+    }
+
+    LOG(INFO) << name << " shutdown complete.";
+  }
+}
+
+string MetricsCollector::ToString() const {
+  return "MetricsCollector";
+}
+
+Status MetricsCollector::StartMetricCollectorThread() {
+  return Thread::Create("server", "metric-collector", &MetricsCollector::MetricCollectorThread,
+                        this, &metric_collector_thread_);
+}
+
+void MetricsCollector::MetricCollectorThread() {
+  MonoTime collect_time;
+  do {
+    collect_time = MonoTime::Now();
+    WARN_NOT_OK(CollectAndReportMetrics(), "Unable to collect metrics");
+    collect_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec);
+  } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(collect_time));
+  LOG(INFO) << "MetricCollectorThread exit";
+}
+
+Status MetricsCollector::UpdateThreadPool(int32_t thread_count) {
+  if (host_metric_collector_thread_pool_ &&
+      host_metric_collector_thread_pool_->num_threads() == thread_count) {
+    return Status::OK();
+  }
+
+  if (host_metric_collector_thread_pool_) {
+    host_metric_collector_thread_pool_->Shutdown();
+  }
+  TRACE("Old thread pool shutdown");
+
+  RETURN_NOT_OK(ThreadPoolBuilder("host-metric-collector")
+      .set_min_threads(thread_count)
+      .set_max_threads(thread_count)
+      .set_idle_timeout(MonoDelta::FromMilliseconds(1))
+      .Build(&host_metric_collector_thread_pool_));
+  TRACE("New thread pool built");
+
+  return Status::OK();
+}
+
+Status MetricsCollector::ValidateTableFilter(const string& attribute_filter,
+                                             const string& /*table_filter*/) {
+  if (attribute_filter.empty()) {
+    return Status::OK();
+  }
+
+  return Status::InvalidArgument("attribute filter is not supported now");
+}
+
+Status MetricsCollector::InitMetrics() {
+  string resp;
+  if (PREDICT_TRUE(FLAGS_collector_metrics_types_for_test.empty())) {
+    RETURN_NOT_OK(GetMetrics(
+        nodes_checker_->GetFirstMaster() + "/metrics?include_schema=1", &resp));
+  } else {
+    resp = FLAGS_collector_metrics_types_for_test;
+  }
+  JsonReader r(resp);
+  RETURN_NOT_OK(r.Init());
+  vector<const Value*> entities;
+  RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &entities));
+
+  map<string, MetricTypes> metric_types_by_entity_type;
+  bool tablet_entity_inited = false;
+  bool server_entity_inited = false;
+  for (const Value* entity : entities) {
+    string entity_type;
+    CHECK_OK(r.ExtractString(entity, "type", &entity_type));
+    if (entity_type == "tablet") {
+      if (tablet_entity_inited) continue;
+      EmplaceOrDie(&metric_types_by_entity_type, std::make_pair("tablet", MetricTypes()));
+      auto& tablet_metric_types = FindOrDie(metric_types_by_entity_type, "tablet");
+      ExtractMetricTypes(r, entity, &tablet_metric_types);
+      tablet_entity_inited = true;
+    } else if (entity_type == "server") {
+      if (server_entity_inited) continue;
+      EmplaceOrDie(&metric_types_by_entity_type, std::make_pair("server", MetricTypes()));
+      auto& server_metric_types = FindOrDie(metric_types_by_entity_type, "server");
+      ExtractMetricTypes(r, entity, &server_metric_types);
+      server_entity_inited = true;
+    } else {
+      LOG(WARNING) << "unhandled entity type " << entity_type;
+    }
+  }
+  metric_types_by_entity_type_.swap(metric_types_by_entity_type);
+  return Status::OK();
+}
+
+Status MetricsCollector::ExtractMetricTypes(const JsonReader& r,
+                                            const Value* entity,
+                                            MetricTypes* metric_types) {
+  CHECK(metric_types);
+  vector<const Value*> metrics;
+  RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
+  for (const Value* metric : metrics) {
+    string name;
+    RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
+    string type;
+    RETURN_NOT_OK(r.ExtractString(metric, "type", &type));
+    string upper_type;
+    ToUpperCase(type, &upper_type);
+    EmplaceOrDie(metric_types, std::make_pair(name, upper_type));
+  }
+  return Status::OK();
+}
+
+Status MetricsCollector::InitFilters() {
+  unordered_map<string, set<string>> attributes_filter;
+  vector<string> attribute_values_by_name_list =
+      Split(FLAGS_collector_attributes, ";", strings::SkipEmpty());
+  for (const auto& attribute_values_by_name : attribute_values_by_name_list) {
+    vector<string> attribute_name_and_values =
+        Split(attribute_values_by_name, ":", strings::SkipEmpty());
+    CHECK_EQ(attribute_name_and_values.size(), 2);
+    set<string> values(Split(attribute_name_and_values[1], ",", strings::SkipEmpty()));
+    CHECK(!values.empty());
+    EmplaceOrDie(&attributes_filter, std::make_pair(attribute_name_and_values[0], values));
+  }
+  attributes_filter_.swap(attributes_filter);
+  return Status::OK();
+}
+
+Status MetricsCollector::InitMetricsUrlParameters() {
+  metric_url_parameters_ = "/metrics?compact=1";
+  if (!FLAGS_collector_metrics.empty()) {
+    metric_url_parameters_ += "&metrics=" + FLAGS_collector_metrics;
+  }
+  if (FLAGS_collector_request_merged_metrics) {
+    metric_url_parameters_ += "&origin=false&merge=true";
+  } else {
+    LOG(FATAL) << "Non-merge mode is not supported now, you should set "
+                  "FLAGS_collector_request_merged_metrics to true if you "
+                  "want collector work well";
+  }
+
+  // TODO(yingchun) This is supported since version 1.10
+  if (!attributes_filter_.empty()) {
+    metric_url_parameters_ += "&attributes=";
+  }
+  for (const auto& attribute_filter : attributes_filter_) {
+    for (const auto& value : attribute_filter.second) {
+      metric_url_parameters_ += Substitute("$0,$1,", attribute_filter.first, value);
+    }
+  }
+  // TODO(yingchun) This is supported since internal version 1.8.0
+  if (!FLAGS_collector_table_names.empty()) {
+    metric_url_parameters_ += "&table_names=" + FLAGS_collector_table_names;
+  }
+  return Status::OK();
+}
+
+Status MetricsCollector::InitClusterLevelMetrics() {
+  Metrics cluster_metrics;
+  vector<string> metric_names =
+      Split(FLAGS_collector_cluster_level_metrics, ",", strings::SkipEmpty());
+  for (const auto& metric_name : metric_names) {
+    cluster_metrics[metric_name] = 0;
+  }
+  cluster_metrics_.swap(cluster_metrics);
+  return Status::OK();
+}
+
+Status MetricsCollector::CollectAndReportMetrics() {
+  LOG(INFO) << "Start to CollectAndReportMetrics";
+  MonoTime start(MonoTime::Now());
+  scoped_refptr<Trace> trace(new Trace);
+  ADOPT_TRACE(trace.get());
+  TRACE_EVENT0("collector", "MetricsCollector::CollectAndReportMetrics");
+  TRACE("init");
+  vector<string> tserver_http_addrs = nodes_checker_->GetNodes();
+  TRACE("Nodes got");
+  if (tserver_http_addrs.empty()) {
+    return Status::OK();
+  }
+  RETURN_NOT_OK(UpdateThreadPool(static_cast<int32_t>(tserver_http_addrs.size())));
+  vector<TablesMetrics> hosts_metrics_by_table_name(tserver_http_addrs.size());
+  vector<TablesHistMetrics> hosts_hist_metrics_by_table_name(tserver_http_addrs.size());
+  for (int i = 0; i < tserver_http_addrs.size(); ++i) {
+    RETURN_NOT_OK(host_metric_collector_thread_pool_->SubmitFunc(
+      std::bind(&MetricsCollector::CollectAndReportHostLevelMetrics,
+                this,
+                tserver_http_addrs[i] + metric_url_parameters_,
+                &hosts_metrics_by_table_name[i],
+                &hosts_hist_metrics_by_table_name[i])));
+  }
+  TRACE("Thead pool jobs submitted");
+  host_metric_collector_thread_pool_->Wait();
+  TRACE("Thead pool jobs done");
+
+  // Merge to table level metrics.
+  TablesMetrics metrics_by_table_name;
+  TablesHistMetrics hist_metrics_by_table_name;
+  RETURN_NOT_OK(MergeToTableLevelMetrics(hosts_metrics_by_table_name,
+                                         hosts_hist_metrics_by_table_name,
+                                         &metrics_by_table_name,
+                                         &hist_metrics_by_table_name));
+
+  // Merge to cluster level metrics.
+  Metrics cluster_metrics(cluster_metrics_);
+  RETURN_NOT_OK(MergeToClusterLevelMetrics(metrics_by_table_name,
+                                           hist_metrics_by_table_name,
+                                           &cluster_metrics));
+
+  auto timestamp = static_cast<uint64_t>(WallTime_Now());
+
+  // Push table level metrics.
+  RETURN_NOT_OK(ReportTableLevelMetrics(timestamp,
+                                        metrics_by_table_name,
+                                        hist_metrics_by_table_name));
+
+  // Push cluster level metrics.
+  RETURN_NOT_OK(ReportClusterLevelMetrics(timestamp, cluster_metrics));
+
+  int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+  if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+    if (Trace::CurrentTrace()) {
+      LOG(WARNING) << "Trace:" << std::endl
+                   << Trace::CurrentTrace()->DumpToString();
+    }
+  }
+
+  return Status::OK();
+}
+
+Status MetricsCollector::MergeToTableLevelMetrics(
+  const vector<TablesMetrics>& hosts_metrics_by_table_name,
+  const vector<TablesHistMetrics>& hosts_hist_metrics_by_table_name,
+  TablesMetrics* metrics_by_table_name,
+  TablesHistMetrics* hist_metrics_by_table_name) {
+  CHECK(metrics_by_table_name);
+  CHECK(hist_metrics_by_table_name);
+
+  // GAUGE/COUNTER type metrics.
+  int metrics_count = 0;
+  for (const auto& host_metrics_by_table_name : hosts_metrics_by_table_name) {
+    for (const auto& table_metrics1 : host_metrics_by_table_name) {
+      const auto& table_name = table_metrics1.first;
+      const auto& metrics = table_metrics1.second;
+      metrics_count += metrics.size();
+      if (EmplaceIfNotPresent(metrics_by_table_name, std::make_pair(table_name, metrics))) {
+        continue;
+      }
+      // This table has been fetched by some other tserver.
+      auto& table_metrics = FindOrDie(*metrics_by_table_name, table_name);
+      for (const auto& metric_value : metrics) {
+        const auto& metric = metric_value.first;
+        const auto& value = metric_value.second;
+        if (EmplaceIfNotPresent(&table_metrics, std::make_pair(metric, value))) {
+          continue;
+        }
+        // This metric has been fetched by some other tserver.
+        auto& old_value = FindOrDie(table_metrics, metric);
+        old_value += value;
+      }
+    }
+  }
+  TRACE(Substitute("Table GAUGE/COUNTER type metrics merged, count $0", metrics_count));
+
+  // HISTOGRAM type metrics.
+  metrics_count = 0;
+  for (const auto& host_hist_metrics_by_table_name : hosts_hist_metrics_by_table_name) {
+    for (const auto& table_hist_metrics1 : host_hist_metrics_by_table_name) {
+      const auto& table_name = table_hist_metrics1.first;
+      const auto& metrics = table_hist_metrics1.second;
+      metrics_count += metrics.size();
+      if (EmplaceIfNotPresent(hist_metrics_by_table_name, std::make_pair(table_name, metrics))) {
+        continue;
+      }
+      // This table has been fetched by some other tserver.
+      auto& table_hist_metrics = FindOrDie(*hist_metrics_by_table_name, table_name);
+      for (const auto& metric_hist_values : metrics) {
+        const auto& metric = metric_hist_values.first;
+        const auto& hist_values = metric_hist_values.second;
+        if (EmplaceIfNotPresent(&table_hist_metrics, std::make_pair(metric, hist_values))) {
+          continue;
+        }
+        // This metric has been fetched by some other tserver.
+        auto& old_hist_values = FindOrDie(table_hist_metrics, metric);
+        for (auto& hist_value : hist_values) {
+          old_hist_values.emplace_back(hist_value);
+        }
+      }
+    }
+  }
+  TRACE(Substitute("Table HISTOGRAM type metrics merged, count $0", metrics_count));
+
+  return Status::OK();
+}
+
+Status MetricsCollector::MergeToClusterLevelMetrics(
+    const TablesMetrics& metrics_by_table_name,
+    const TablesHistMetrics& /*hist_metrics_by_table_name*/,
+    Metrics* cluster_metrics) {
+  CHECK(cluster_metrics);
+  if (!cluster_metrics->empty()) {
+    for (const auto& table_metrics : metrics_by_table_name) {
+      for (auto& cluster_metric : *cluster_metrics) {
+        auto *find = FindOrNull(table_metrics.second, cluster_metric.first);
+        if (find) {
+          cluster_metric.second += *find;
+        }
+      }
+    }
+  }
+  TRACE(Substitute("Cluster metrics merged, count $0", cluster_metrics->size()));
+
+  return Status::OK();
+}
+
+Status MetricsCollector::GetNumberMetricValue(const rapidjson::Value* metric,
+                                              const string& metric_name /*metric_name*/,
+                                              int64_t* result) const {
+  CHECK(result);
+  if (metric->IsUint64() || metric->IsInt64() || metric->IsUint() || metric->IsInt()) {
+    *result = metric->GetInt64();
+    return Status::OK();
+  }
+
+  if (metric->IsDouble()) {
+    double result_temp = metric->GetDouble();
+    // Multiply by 1000000 and convert to int64_t to avoid much data loss and keep compatibility
+    // with monitor system like Falcon.
+    *result = static_cast<int64_t>(result_temp * 1000000);
+    return Status::OK();
+  }
+
+  return Status::NotSupported(Substitute("unsupported metric $0", metric_name));
+}
+
+Status MetricsCollector::GetStringMetricValue(const Value* metric,
+                                              const string& metric_name,
+                                              int64_t* result) const {
+  CHECK(result);
+  string value(metric->GetString());
+  if (metric_name == "state") {
+    return ConvertStateToInt(value, result);
+  }
+  return Status::NotSupported(Substitute("unsupported metric $0", metric_name));
+}
+
+Status MetricsCollector::ConvertStateToInt(const string& value, int64_t* result) {
+  CHECK(result);
+  // TODO(yingchun) Here, table state is merged by several original tablet states, which is
+  // contacted by several sub-strings, like 'RUNNING', 'BOOTSTRAPPING', etc. It's tricky to
+  // fetch state now, we will improve in server side later.
+  const char* running = "RUNNING";
+  if (value.empty() || value.size() % strlen(running) != 0) {
+    *result = 0;
+    return Status::OK();
+  }
+  for (int i = 0; i < value.size(); i += strlen(running)) {
+    if (0 != strncmp(running, value.c_str() + i, strlen(running))) {
+      *result = 0;
+      return Status::OK();
+    }
+  }
+  *result = 1;
+  return Status::OK();
+}
+
+bool MetricsCollector::FilterByAttribute(const JsonReader& r,
+                                         const rapidjson::Value* entity) const {
+  if (attributes_filter_.empty()) {
+    return false;
+  }
+  const Value* attributes;
+  CHECK_OK(r.ExtractObject(entity, "attributes", &attributes));
+  for (const auto& name_values : attributes_filter_) {
+    string value;
+    Status s = r.ExtractString(attributes, name_values.first.c_str(), &value);
+    if (s.ok() && ContainsKey(name_values.second, value)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+Status MetricsCollector::ParseServerMetrics(const JsonReader& /*r*/,
+                                            const rapidjson::Value* /*entity*/) {
+  return Status::NotSupported("server entity is not supported");
+}
+
+Status MetricsCollector::ParseTableMetrics(const JsonReader& r,
+                                           const rapidjson::Value* entity,
+                                           TablesMetrics* metrics_by_table_name,
+                                           Metrics* host_metrics,
+                                           TablesHistMetrics* hist_metrics_by_table_name,
+                                           HistMetrics* host_hist_metrics) const {
+  CHECK(metrics_by_table_name);
+  CHECK(host_metrics);
+  CHECK(hist_metrics_by_table_name);
+  CHECK(host_hist_metrics);
+
+  string table_name;
+  CHECK_OK(r.ExtractString(entity, "id", &table_name));
+  CHECK(!ContainsKey(*metrics_by_table_name, table_name));
+  CHECK(!ContainsKey(*hist_metrics_by_table_name, table_name));
+
+  EmplaceOrDie(metrics_by_table_name, std::make_pair(table_name, Metrics()));
+  auto& table_metrics = FindOrDie(*metrics_by_table_name, table_name);
+
+  EmplaceOrDie(hist_metrics_by_table_name, std::make_pair(table_name, HistMetrics()));
+  auto& table_hist_metrics = FindOrDie(*hist_metrics_by_table_name, table_name);
+
+  vector<const Value*> metrics;
+  CHECK_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
+  for (const Value* metric : metrics) {
+    string name;
+    CHECK_OK(r.ExtractString(metric, "name", &name));
+    const auto* tablet_metric_types = FindOrNull(metric_types_by_entity_type_, "tablet");
+    CHECK(tablet_metric_types);
+    const auto* known_type = FindOrNull(*tablet_metric_types, name);
+    if (!known_type) {
+      LOG(ERROR) << Substitute("metric $0 has unknown type, ignore it", name);
+      continue;
+    }
+
+    if (*known_type == "GAUGE" || *known_type ==  "COUNTER") {
+      int64_t value = 0;
+      const Value* val;
+      RETURN_NOT_OK(r.ExtractField(metric, "value", &val));
+      rapidjson::Type type = val->GetType();
+      switch (type) {
+        case rapidjson::Type::kStringType:
+          CHECK_OK(GetStringMetricValue(val, name, &value));
+          break;
+        case rapidjson::Type::kNumberType:
+          CHECK_OK(GetNumberMetricValue(val, name, &value));
+          break;
+        default:
+          LOG(FATAL) << "Unknown type, metrics name: " << name;
+      }
+
+      EmplaceOrDie(&table_metrics, std::make_pair(name, value));
+      if (!EmplaceIfNotPresent(host_metrics, std::make_pair(name, value))) {
+        auto& host_metric = FindOrDie(*host_metrics, name);
+        host_metric += value;
+      }
+    } else if (*known_type == "HISTOGRAM") {
+      for (const auto& percentile : kRegisterPercentiles) {
+        string hist_metric_name(name);
+        hist_metric_name += "_" + percentile;
+        int64_t total_count;
+        CHECK_OK(r.ExtractInt64(metric, "total_count", &total_count));
+        int64_t percentile_value;
+        CHECK_OK(r.ExtractInt64(metric, percentile.c_str(), &percentile_value));
+        vector<SimpleHistogram> tmp({{total_count, percentile_value}});
+        EmplaceOrDie(&table_hist_metrics, std::make_pair(hist_metric_name, tmp));
+        if (!EmplaceIfNotPresent(host_hist_metrics, std::make_pair(hist_metric_name, tmp))) {
+          auto& host_hist_metric = FindOrDie(*host_hist_metrics, hist_metric_name);
+          host_hist_metric.emplace_back(tmp[0]);
+        }
+      }
+    } else {
+      LOG(FATAL) << "Unknown metric type: " << *known_type;
+    }
+  }
+
+  return Status::OK();
+}
+
+Status MetricsCollector::ParseTabletMetrics(const JsonReader& /*r*/,
+                                            const rapidjson::Value* /*entity*/) {
+  return Status::NotSupported("tablet entity is not supported");
+}
+
+Status MetricsCollector::CollectAndReportHostLevelMetrics(
+    const string& url,
+    TablesMetrics* metrics_by_table_name,
+    TablesHistMetrics* hist_metrics_by_table_name) {
+  MonoTime start(MonoTime::Now());
+  scoped_refptr<Trace> trace(new Trace);
+  ADOPT_TRACE(trace.get());
+  TRACE_EVENT1("collector", "MetricsCollector::CollectAndReportHostLevelMetrics",
+               "url", url);
+  TRACE("init");
+  CHECK(metrics_by_table_name);
+  CHECK(hist_metrics_by_table_name);
+
+  // Get metrics from server.
+  string resp;
+  RETURN_NOT_OK(GetMetrics(url, &resp));
+
+  // Merge metrics by table and metric type.
+  Metrics host_metrics;
+  HistMetrics host_hist_metrics;
+  RETURN_NOT_OK(ParseMetrics(resp, metrics_by_table_name, &host_metrics,
+                             hist_metrics_by_table_name, &host_hist_metrics));
+
+  string host_name = ExtractHostName(url);
+  auto timestamp = static_cast<uint64_t>(WallTime_Now());
+
+  // Host table level.
+  if (!FLAGS_collector_ignore_hosttable_level_metrics) {
+    RETURN_NOT_OK(ReportHostTableLevelMetrics(host_name, timestamp,
+                                              *metrics_by_table_name,
+                                              *hist_metrics_by_table_name));
+  }
+
+  // Host level.
+  RETURN_NOT_OK(ReportHostLevelMetrics(host_name, timestamp,
+                                       host_metrics,
+                                       host_hist_metrics));
+
+  int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+  if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+    if (Trace::CurrentTrace()) {
+      LOG(WARNING) << "Trace:" << std::endl
+                   << Trace::CurrentTrace()->DumpToString();
+    }
+  }
+  return Status::OK();
+}
+
+Status MetricsCollector::ParseMetrics(const string& data,
+                                      TablesMetrics* metrics_by_table_name,
+                                      Metrics* host_metrics,
+                                      TablesHistMetrics* hist_metrics_by_table_name,
+                                      HistMetrics* host_hist_metrics) {
+  JsonReader r(data);
+  RETURN_NOT_OK(r.Init());
+  vector<const Value*> entities;
+  RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &entities));
+
+  for (const Value* entity : entities) {
+    if (FilterByAttribute(r, entity)) {
+      continue;
+    }
+    string entity_type;
+    CHECK_OK(r.ExtractString(entity, "type", &entity_type));
+    if (entity_type == "server") {
+      CHECK(ParseServerMetrics(r, entity).IsNotSupported());
+    } else if (entity_type == "table") {
+      CHECK_OK(ParseTableMetrics(r, entity,
+                                 metrics_by_table_name, host_metrics,
+                                 hist_metrics_by_table_name, host_hist_metrics));
+    } else if (entity_type == "tablet") {
+      CHECK(ParseTabletMetrics(r, entity).IsNotSupported());
+    } else {
+      LOG(FATAL) << "Unknown entity_type: " << entity_type;
+    }
+  }
+  TRACE(Substitute("Metrics parsed, entity count $0", entities.size()));
+
+  return Status::OK();
+}
+
+void MetricsCollector::CollectMetrics(const string& endpoint,
+                                      const Metrics& metrics,
+                                      const std::string& level,
+                                      uint64_t timestamp,
+                                      const std::string& extra_tags,
+                                      list<scoped_refptr<ItemBase>>* items) {
+  for (const auto& metric : metrics) {
+    items->emplace_back(
+      reporter_->ConstructItem(endpoint,
+                               metric.first,
+                               level,
+                               timestamp,
+                               metric.second,
+                               FindOrDie(metric_types_by_entity_type_["tablet"], metric.first),
+                               extra_tags));
+  }
+}
+
+void MetricsCollector::CollectMetrics(const string& endpoint,
+                      const HistMetrics& metrics,
+                      const string& level,
+                      uint64_t timestamp,
+                      const string& extra_tags,
+                      list<scoped_refptr<ItemBase>>* items) {
+  for (const auto& metric : metrics) {
+    items->emplace_back(
+      reporter_->ConstructItem(endpoint,
+                               metric.first,
+                               level,
+                               timestamp,
+                               GetHistValue(metric.second),
+                               "GAUGE",
+                               extra_tags));
+  }
+}
+
+Status MetricsCollector::ReportHostTableLevelMetrics(
+    const string& host_name,
+    uint64_t timestamp,
+    const TablesMetrics& metrics_by_table_name,
+    const TablesHistMetrics& hist_metrics_by_table_name) {
+  list<scoped_refptr<ItemBase>> items;
+  // GAUGE/COUNTER type metrics.
+  int metrics_count = 0;
+  for (const auto& table_metrics : metrics_by_table_name) {
+    const auto extra_tag = Substitute("table=$0", table_metrics.first);
+    metrics_count += table_metrics.second.size();
+    CollectMetrics(host_name, table_metrics.second, "host_table", timestamp, extra_tag, &items);
+  }
+  TRACE(Substitute("Host-table GAUGE/COUNTER type metrics collected, count $0", metrics_count));
+
+  // HISTOGRAM type metrics.
+  int hist_metrics_count = 0;
+  for (const auto& table_hist_metrics : hist_metrics_by_table_name) {
+    const auto extra_tag = Substitute("table=$0", table_hist_metrics.first);
+    hist_metrics_count += table_hist_metrics.second.size();
+    CollectMetrics(host_name, table_hist_metrics.second,
+                   "host_table", timestamp, extra_tag,
+                   &items);
+  }
+  TRACE(Substitute("Host-table HISTOGRAM type metrics collected, count $0", hist_metrics_count));
+
+  reporter_->PushItems(std::move(items));
+  TRACE(Substitute("Host-table metrics reported, count $0", metrics_count + hist_metrics_count));
+
+  return Status::OK();
+}
+
+Status MetricsCollector::ReportHostLevelMetrics(
+    const string& host_name,
+    uint64_t timestamp,
+    const Metrics& host_metrics,
+    const HistMetrics& host_hist_metrics) {
+  list<scoped_refptr<ItemBase>> items;
+  // GAUGE/COUNTER type metrics.
+  CollectMetrics(host_name, host_metrics, "host", timestamp, "", &items);
+  TRACE(Substitute("Host GAUGE/COUNTER type metrics collected, count $0", host_metrics.size()));
+
+  // HISTOGRAM type metrics.
+  CollectMetrics(host_name, host_hist_metrics, "host", timestamp, "", &items);
+  TRACE(Substitute("Host HISTOGRAM type metrics collected, count $0", host_hist_metrics.size()));
+
+  reporter_->PushItems(std::move(items));
+  TRACE(Substitute("Host metrics reported, count $0",
+                   host_metrics.size() + host_hist_metrics.size()));
+
+  return Status::OK();
+}
+
+Status MetricsCollector::ReportTableLevelMetrics(
+    uint64_t timestamp,
+    const TablesMetrics& metrics_by_table_name,
+    const TablesHistMetrics& hist_metrics_by_table_name) {
+  list<scoped_refptr<ItemBase>> items;
+  // GAUGE/COUNTER type metrics.
+  int metrics_count = 0;
+  for (const auto& table_metrics : metrics_by_table_name) {
+    metrics_count += table_metrics.second.size();
+    CollectMetrics(table_metrics.first,
+                   table_metrics.second,
+                   "table", timestamp, "", &items);
+  }
+  TRACE(Substitute("Table GAUGE/COUNTER type metrics collected, count $0", metrics_count));
+
+  // HISTOGRAM type metrics.
+  int hist_metrics_count = 0;
+  for (const auto& table_hist_metrics : hist_metrics_by_table_name) {
+    hist_metrics_count += table_hist_metrics.second.size();
+    CollectMetrics(table_hist_metrics.first,
+                   table_hist_metrics.second,
+                   "table", timestamp, "", &items);
+  }
+  TRACE(Substitute("Table HISTOGRAM type metrics collected, count $0", hist_metrics_count));
+
+  reporter_->PushItems(std::move(items));
+  TRACE(Substitute("Table metrics reported, count $0", metrics_count + hist_metrics_count));
+
+  return Status::OK();
+}
+
+Status MetricsCollector::ReportClusterLevelMetrics(uint64_t timestamp,
+                                                   const Metrics& cluster_metrics) {
+  list<scoped_refptr<ItemBase>> items;
+  CollectMetrics(FLAGS_collector_cluster_name, cluster_metrics, "cluster", timestamp, "", &items);
+  TRACE(Substitute("Cluster metrics collected, count $0", cluster_metrics.size()));
+
+  reporter_->PushItems(std::move(items));
+  TRACE(Substitute("Cluster metrics reported, count $0", cluster_metrics.size()));
+
+  return Status::OK();
+}
+
+int64_t MetricsCollector::GetHistValue(const vector<SimpleHistogram>& hist_values) {
+  int64_t total_count = 0;
+  double total_value = 0.0;
+  for (const auto& hist_value : hist_values) {
+    total_count += hist_value.count;
+    total_value += hist_value.count * hist_value.value;
+  }
+  int64_t value = 0;
+  if (total_count != 0) {
+    value = std::llround(total_value / total_count);
+  }
+  return value;
+}
+
+Status MetricsCollector::GetMetrics(const string& url, string* resp) {
+  CHECK(resp);
+  EasyCurl curl;
+  faststring dst;
+  //curl.set_return_headers(true);
+  RETURN_NOT_OK(curl.FetchURL(url, &dst, {"Accept-Encoding: gzip"}));
+  std::ostringstream oss;
+  string dst_str = dst.ToString();
+  if (zlib::Uncompress(Slice(dst_str), &oss).ok()) {
+    *resp = oss.str();
+  } else {
+    *resp = dst_str;
+  }
+  TRACE(Substitute("Metrics got from server: $0", url));
+
+  return Status::OK();
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/metrics_collector.h b/src/kudu/collector/metrics_collector.h
new file mode 100644
index 0000000..1435e52
--- /dev/null
+++ b/src/kudu/collector/metrics_collector.h
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class JsonReader;
+class Thread;
+class ThreadPool;
+
+namespace collector {
+struct ItemBase;
+}  // namespace collector
+}  // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class NodesChecker;
+class ReporterBase;
+
+class MetricsCollector : public RefCounted<MetricsCollector> {
+ public:
+  MetricsCollector(scoped_refptr<NodesChecker> nodes_checker,
+                   scoped_refptr<ReporterBase> reporter);
+  ~MetricsCollector();
+
+  Status Init();
+  Status Start();
+  void Shutdown();
+
+  std::string ToString() const;
+
+ private:
+  friend class RefCounted<MetricsCollector>;
+
+  FRIEND_TEST(TestMetricsCollector, TestConvertStateToInt);
+  FRIEND_TEST(TestMetricsCollector, TestGetHistValue);
+  FRIEND_TEST(TestMetricsCollector, TestMergeToTableLevelMetrics);
+  FRIEND_TEST(TestMetricsCollector, TestMergeToClusterLevelMetrics);
+  FRIEND_TEST(TestMetricsCollector, TestParseMetrics);
+  FRIEND_TEST(TestMetricsCollector, TestInitMetrics);
+  FRIEND_TEST(TestMetricsCollector, TestInitFilters);
+  FRIEND_TEST(TestMetricsCollector, TestInitMetricsUrlParameters);
+  FRIEND_TEST(TestMetricsCollector, TestInitClusterLevelMetrics);
+
+  typedef std::unordered_map<std::string, int64_t> Metrics;
+  typedef std::unordered_map<std::string, Metrics> TablesMetrics;
+  struct SimpleHistogram {
+    int64_t count;
+    int64_t value;
+    SimpleHistogram(int64_t c, int64_t v) : count(c), value(v) {
+    }
+    inline bool operator==(const SimpleHistogram& rhs) const {
+      return count == rhs.count && value == rhs.value;
+    }
+  };
+
+  typedef std::unordered_map<std::string, std::vector<SimpleHistogram>> HistMetrics;
+  typedef std::unordered_map<std::string, HistMetrics> TablesHistMetrics;
+
+  typedef std::unordered_map<std::string, std::string> MetricTypes;
+
+  Status ValidateTableFilter(const std::string& attribute_filter, const std::string& table_filter);
+  Status InitMetrics();
+  static Status ExtractMetricTypes(const JsonReader& r,
+                                   const rapidjson::Value* entity,
+                                   MetricTypes* metric_types);
+  Status InitFilters();
+  Status InitMetricsUrlParameters();
+  Status InitClusterLevelMetrics();
+
+  Status StartMetricCollectorThread();
+  void MetricCollectorThread();
+  Status CollectAndReportMetrics();
+
+  Status UpdateThreadPool(int32_t thread_count);
+
+  Status CollectAndReportHostLevelMetrics(const std::string& url,
+                                          TablesMetrics* metrics_by_table_name,
+                                          TablesHistMetrics* hist_metrics_by_table_name);
+
+  static Status MergeToTableLevelMetrics(
+      const std::vector<TablesMetrics>& hosts_metrics_by_table_name,
+      const std::vector<TablesHistMetrics>& hosts_hist_metrics_by_table_name,
+      TablesMetrics* metrics_by_table_name,
+      TablesHistMetrics* hist_metrics_by_table_name);
+  static Status MergeToClusterLevelMetrics(const TablesMetrics& metrics_by_table_name,
+                                           const TablesHistMetrics& hist_metrics_by_table_name,
+                                           Metrics* cluster_metrics);
+
+  // Report metrics to third-party monitor system.
+  void CollectMetrics(const std::string& endpoint,
+                      const Metrics& metrics,
+                      const std::string& level,
+                      uint64_t timestamp,
+                      const std::string& extra_tags,
+                      std::list<scoped_refptr<ItemBase>>* items);
+  void CollectMetrics(const std::string& endpoint,
+                      const HistMetrics& metrics,
+                      const std::string& level,
+                      uint64_t timestamp,
+                      const std::string& extra_tags,
+                      std::list<scoped_refptr<ItemBase>>* items);
+
+  Status ReportHostTableLevelMetrics(const std::string& host_name,
+                                     uint64_t timestamp,
+                                     const TablesMetrics& metrics_by_table_name,
+                                     const TablesHistMetrics& hist_metrics_by_table_name);
+  Status ReportHostLevelMetrics(const std::string& host_name,
+                                uint64_t timestamp,
+                                const Metrics& host_metrics,
+                                const HistMetrics& host_hist_metrics);
+  Status ReportTableLevelMetrics(uint64_t timestamp,
+                                 const TablesMetrics& metrics_by_table_name,
+                                 const TablesHistMetrics& hist_metrics_by_table_name);
+  Status ReportClusterLevelMetrics(uint64_t timestamp,
+                                   const Metrics& cluster_metrics);
+  static int64_t GetHistValue(const std::vector<SimpleHistogram>& hist_values);
+
+  // Get metrics from server by http method.
+  static Status GetMetrics(const std::string& url, std::string* resp);
+
+  // Parse metrics from http response, entities may be in different types.
+  Status ParseMetrics(const std::string& data,
+                      TablesMetrics* metrics_by_table_name,
+                      Metrics* host_metrics,
+                      TablesHistMetrics* hist_metrics_by_table_name,
+                      HistMetrics* host_hist_metrics);
+  static Status ParseServerMetrics(const JsonReader& r,
+                                   const rapidjson::Value* entity);
+  Status ParseTableMetrics(const JsonReader& r,
+                           const rapidjson::Value* entity,
+                           TablesMetrics* metrics_by_table_name,
+                           Metrics* host_metrics,
+                           TablesHistMetrics* hist_metrics_by_table_name,
+                           HistMetrics* host_hist_metrics) const;
+  static Status ParseTabletMetrics(const JsonReader& r,
+                                   const rapidjson::Value* entity);
+
+  // Return true when this entity could be filtered.
+  // When server side support attributes filter, this function has no effect.
+  bool FilterByAttribute(const JsonReader& r,
+                         const rapidjson::Value* entity) const;
+  Status GetNumberMetricValue(const rapidjson::Value* metric,
+                              const std::string& metric_name,
+                              int64_t* result) const;
+  Status GetStringMetricValue(const rapidjson::Value* metric,
+                              const std::string& metric_name,
+                              int64_t* result) const;
+  static Status ConvertStateToInt(const std::string& value, int64_t* result);
+
+  static const std::set<std::string> kRegisterPercentiles;
+
+  bool initialized_;
+
+  scoped_refptr<NodesChecker> nodes_checker_;
+  scoped_refptr<ReporterBase> reporter_;
+
+  std::map<std::string, MetricTypes> metric_types_by_entity_type_;
+  // Attribute filter, attributes not in this map will be filtered if it's not empty.
+  // attribute name ---> attribute values
+  std::unordered_map<std::string, std::set<std::string>> attributes_filter_;
+  std::string metric_url_parameters_;
+  Metrics cluster_metrics_;
+
+  CountDownLatch stop_background_threads_latch_;
+  scoped_refptr<Thread> metric_collector_thread_;
+  std::unique_ptr<ThreadPool> host_metric_collector_thread_pool_;
+
+  DISALLOW_COPY_AND_ASSIGN(MetricsCollector);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/nodes_checker-test.cc b/src/kudu/collector/nodes_checker-test.cc
new file mode 100644
index 0000000..2390a52
--- /dev/null
+++ b/src/kudu/collector/nodes_checker-test.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/nodes_checker.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/rebalance/cluster_status.h"
+
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::HealthCheckResult;
+
+namespace kudu {
+namespace collector {
+
+TEST(TestNodesChecker, TestExtractServerHealthStatus) {
+  ASSERT_EQ(ServerHealth::HEALTHY,
+            NodesChecker::ExtractServerHealthStatus("HEALTHY"));
+  ASSERT_EQ(ServerHealth::UNAUTHORIZED,
+            NodesChecker::ExtractServerHealthStatus("UNAUTHORIZED"));
+  ASSERT_EQ(ServerHealth::UNAVAILABLE,
+            NodesChecker::ExtractServerHealthStatus("UNAVAILABLE"));
+  ASSERT_EQ(ServerHealth::WRONG_SERVER_UUID,
+            NodesChecker::ExtractServerHealthStatus("WRONG_SERVER_UUID"));
+}
+
+TEST(TestNodesChecker, TestExtractTableHealthStatus) {
+  ASSERT_EQ(HealthCheckResult::HEALTHY,
+            NodesChecker::ExtractTableHealthStatus("HEALTHY"));
+  ASSERT_EQ(HealthCheckResult::RECOVERING,
+            NodesChecker::ExtractTableHealthStatus("RECOVERING"));
+  ASSERT_EQ(HealthCheckResult::UNDER_REPLICATED,
+            NodesChecker::ExtractTableHealthStatus("UNDER_REPLICATED"));
+  ASSERT_EQ(HealthCheckResult::UNAVAILABLE,
+            NodesChecker::ExtractTableHealthStatus("UNAVAILABLE"));
+  ASSERT_EQ(HealthCheckResult::CONSENSUS_MISMATCH,
+            NodesChecker::ExtractTableHealthStatus("CONSENSUS_MISMATCH"));
+}
+}  // namespace collector
+}  // namespace kudu
+
diff --git a/src/kudu/collector/nodes_checker.cc b/src/kudu/collector/nodes_checker.cc
new file mode 100644
index 0000000..8b64c29
--- /dev/null
+++ b/src/kudu/collector/nodes_checker.cc
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/nodes_checker.h"
+
+#include <cstdint>
+#include <list>
+#include <mutex>
+#include <ostream>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <rapidjson/document.h>
+
+#include "kudu/collector/collector_util.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+DECLARE_string(collector_cluster_name);
+DECLARE_string(collector_master_addrs);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+DECLARE_int32(collector_warn_threshold_ms);
+
+using rapidjson::Value;
+using std::list;
+using std::string;
+using std::vector;
+using strings::Substitute;
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::HealthCheckResult;
+
+namespace kudu {
+namespace collector {
+
+const std::string NodesChecker::kMaster = "master";
+const std::string NodesChecker::kTserver = "tserver";
+
+NodesChecker::NodesChecker(scoped_refptr<ReporterBase> reporter)
+  : initialized_(false),
+    reporter_(std::move(reporter)),
+    stop_background_threads_latch_(1) {
+}
+
+NodesChecker::~NodesChecker() {
+  Shutdown();
+}
+
+Status NodesChecker::Init() {
+  CHECK(!initialized_);
+
+  RETURN_NOT_OK(UpdateNodes());
+  CHECK(!master_http_addrs_.empty());
+
+  initialized_ = true;
+  return Status::OK();
+}
+
+Status NodesChecker::Start() {
+  CHECK(initialized_);
+
+  RETURN_NOT_OK(StartNodesCheckerThread());
+
+  return Status::OK();
+}
+
+void NodesChecker::Shutdown() {
+  if (initialized_) {
+    string name = ToString();
+    LOG(INFO) << name << " shutting down...";
+
+    stop_background_threads_latch_.CountDown();
+
+    if (nodes_checker_thread_) {
+      nodes_checker_thread_->Join();
+    }
+
+    LOG(INFO) << name << " shutdown complete.";
+  }
+}
+
+string NodesChecker::ToString() const {
+  return "NodesChecker";
+}
+
+vector<string> NodesChecker::GetNodes() {
+  shared_lock<RWMutex> l(nodes_lock_);
+  return tserver_http_addrs_;
+}
+
+string NodesChecker::GetFirstMaster() {
+  shared_lock<RWMutex> l(nodes_lock_);
+  CHECK(!master_http_addrs_.empty());
+  return master_http_addrs_[0];
+}
+
+Status NodesChecker::StartNodesCheckerThread() {
+  return Thread::Create("collector", "nodes-checker", &NodesChecker::NodesCheckerThread,
+                        this, &nodes_checker_thread_);
+}
+
+void NodesChecker::NodesCheckerThread() {
+  MonoTime check_time;
+  do {
+    check_time = MonoTime::Now();
+    UpdateAndCheckNodes();
+    check_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec);
+  } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(check_time));
+  LOG(INFO) << "FalconPusherThread exit";
+}
+
+void NodesChecker::UpdateAndCheckNodes() {
+  LOG(INFO) << "Start to UpdateAndCheckNodes";
+  MonoTime start(MonoTime::Now());
+  scoped_refptr<Trace> trace(new Trace);
+  ADOPT_TRACE(trace.get());
+  TRACE_EVENT0("collector", "NodesChecker::UpdateAndCheckNodes");
+  WARN_NOT_OK(UpdateNodes(), "Unable to update nodes");
+  WARN_NOT_OK(CheckNodes(), "Unable to check nodes");
+  int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+  if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+    if (Trace::CurrentTrace()) {
+      LOG(WARNING) << "Trace:" << std::endl
+                   << Trace::CurrentTrace()->DumpToString();
+    }
+  }
+}
+
+Status NodesChecker::UpdateNodes() {
+  RETURN_NOT_OK(UpdateServers(kMaster));
+  RETURN_NOT_OK(UpdateServers(kTserver));
+  return Status::OK();
+}
+
+Status NodesChecker::UpdateServers(const std::string& role) {
+  DCHECK(role == kTserver || role == kMaster);
+  vector<string> args = {
+    role,
+    "list",
+    FLAGS_collector_master_addrs,
+    "-columns=http-addresses",
+    "-format=json",
+    Substitute("-timeout_ms=$0", FLAGS_collector_timeout_sec*1000)
+  };
+  string tool_stdout;
+  string tool_stderr;
+  RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+                        Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+  TRACE(Substitute("'$0 list' done", role));
+
+  JsonReader r(tool_stdout);
+  RETURN_NOT_OK(r.Init());
+  vector<const Value*> servers;
+  CHECK_OK(r.ExtractObjectArray(r.root(), nullptr, &servers));
+  vector<string> server_http_addrs;
+  for (const Value* server : servers) {
+    string http_address;
+    CHECK_OK(r.ExtractString(server, "http-addresses", &http_address));
+    server_http_addrs.emplace_back(http_address);
+  }
+  TRACE(Substitute("Result parsed, nodes count $0", server_http_addrs.size()));
+
+  if (role == kTserver) {
+    std::lock_guard<RWMutex> l(nodes_lock_);
+    tserver_http_addrs_.swap(server_http_addrs);
+  } else {
+    std::lock_guard<RWMutex> l(nodes_lock_);
+    master_http_addrs_.swap(server_http_addrs);
+  }
+  TRACE("Nodes updated");
+
+  return Status::OK();
+}
+
+Status NodesChecker::CheckNodes() const {
+  vector<string> args = {
+    "cluster",
+    "ksck",
+    FLAGS_collector_master_addrs,
+    "-consensus=false",
+    "-ksck_format=json_compact",
+    "-color=never",
+    "-sections=MASTER_SUMMARIES,TSERVER_SUMMARIES,TABLE_SUMMARIES,TOTAL_COUNT",
+    Substitute("-timeout_ms=$0", FLAGS_collector_timeout_sec*1000)
+  };
+  string tool_stdout;
+  string tool_stderr;
+  WARN_NOT_OK(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+              Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+
+  TRACE("'cluster ksck' done");
+
+  RETURN_NOT_OK(ReportNodesMetrics(tool_stdout));
+  return Status::OK();
+}
+
+Status NodesChecker::ReportNodesMetrics(const string& data) const {
+  JsonReader r(data);
+  RETURN_NOT_OK(r.Init());
+  const Value* ksck;
+  CHECK_OK(r.ExtractObject(r.root(), nullptr, &ksck));
+  auto timestamp = static_cast<uint64_t>(WallTime_Now());
+
+  list<scoped_refptr<ItemBase>> items;
+  // Maters health info.
+  vector<const Value*> masters;
+  CHECK_OK(r.ExtractObjectArray(ksck, "master_summaries", &masters));
+  for (const Value* master : masters) {
+    string address;
+    CHECK_OK(r.ExtractString(master, "address", &address));
+    string health;
+    CHECK_OK(r.ExtractString(master, "health", &health));
+    items.emplace_back(reporter_->ConstructItem(
+      ExtractHostName(address),
+      "kudu-master-health",
+      "host",
+      timestamp,
+      static_cast<int64_t>(ExtractServerHealthStatus(health)),
+      "GAUGE",
+      ""));
+  }
+  TRACE(Substitute("Maters health info reported, count $0", masters.size()));
+
+  // Tservers health info.
+  vector<const Value*> tservers;
+  Status s = r.ExtractObjectArray(ksck, "tserver_summaries", &tservers);
+  CHECK(s.ok() || s.IsNotFound());
+  if (s.ok()) {
+    for (const Value* tserver : tservers) {
+      string address;
+      CHECK_OK(r.ExtractString(tserver, "address", &address));
+      string health;
+      CHECK_OK(r.ExtractString(tserver, "health", &health));
+      items.emplace_back(reporter_->ConstructItem(
+        ExtractHostName(address),
+        "kudu-tserver-health",
+        "host",
+        timestamp,
+        static_cast<int64_t>(ExtractServerHealthStatus(health)),
+        "GAUGE",
+        ""));
+    }
+    TRACE(Substitute("Tservers health info reported, count $0", tservers.size()));
+  }
+
+  // Tables health info.
+  uint32_t health_table_count = 0;
+  vector<const Value*> tables;
+  s = r.ExtractObjectArray(ksck, "table_summaries", &tables);
+  CHECK(s.ok() || s.IsNotFound());
+  if (s.ok()) {
+    for (const Value* table : tables) {
+      string name;
+      CHECK_OK(r.ExtractString(table, "name", &name));
+      string health;
+      CHECK_OK(r.ExtractString(table, "health", &health));
+      HealthCheckResult health_status = ExtractTableHealthStatus(health);
+      items.emplace_back(reporter_->ConstructItem(
+        name,
+        "kudu-table-health",
+        "table",
+        timestamp,
+        static_cast<int64_t>(health_status),
+        "GAUGE",
+        ""));
+      if (health_status == HealthCheckResult::HEALTHY) {
+        health_table_count += 1;
+      }
+    }
+    TRACE(Substitute("Tables health info reported, count $0", tables.size()));
+  }
+
+  // Healthy table ratio.
+  if (!tables.empty()) {
+    items.emplace_back(reporter_->ConstructItem(
+      FLAGS_collector_cluster_name,
+      "healthy_table_proportion",
+      "cluster",
+      timestamp,
+      100 * health_table_count / tables.size(),
+      "GAUGE",
+      ""));
+    TRACE("Healthy table ratio reported");
+  }
+
+  // Count summaries.
+  vector<const Value*> count_summaries;
+  CHECK_OK(r.ExtractObjectArray(ksck, "count_summaries", &count_summaries));
+  for (const Value* count_summarie : count_summaries) {
+    // TODO(yingchun) should auto iterate items
+    static const vector<string>
+        count_names({"masters", "tservers", "tables", "tablets", "replicas"});
+    for (const auto& name : count_names) {
+      int64_t count;
+      CHECK_OK(r.ExtractInt64(count_summarie, name.c_str(), &count));
+      items.emplace_back(reporter_->ConstructItem(
+        FLAGS_collector_cluster_name,
+        name + "_count",
+        "cluster",
+        timestamp,
+        count,
+        "GAUGE",
+        ""));
+    }
+  }
+  TRACE("Count summaries reported");
+
+  reporter_->PushItems(std::move(items));
+  TRACE("Pushed");
+
+  return Status::OK();
+}
+
+ServerHealth NodesChecker::ExtractServerHealthStatus(const string& health) {
+  if (health == "HEALTHY") return ServerHealth::HEALTHY;
+  if (health == "UNAUTHORIZED") return ServerHealth::UNAUTHORIZED;
+  if (health == "UNAVAILABLE") return ServerHealth::UNAVAILABLE;
+  if (health == "WRONG_SERVER_UUID") return ServerHealth::WRONG_SERVER_UUID;
+  CHECK(false) << "Unknown server health: " << health;
+  __builtin_unreachable();
+}
+
+HealthCheckResult NodesChecker::ExtractTableHealthStatus(const string& health) {
+  if (health == "HEALTHY") return HealthCheckResult::HEALTHY;
+  if (health == "RECOVERING") return HealthCheckResult::RECOVERING;
+  if (health == "UNDER_REPLICATED") return HealthCheckResult::UNDER_REPLICATED;
+  if (health == "UNAVAILABLE") return HealthCheckResult::UNAVAILABLE;
+  if (health == "CONSENSUS_MISMATCH") return HealthCheckResult::CONSENSUS_MISMATCH;
+  CHECK(false)  << "Unknown table health: " << health;
+  __builtin_unreachable();
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/nodes_checker.h b/src/kudu/collector/nodes_checker.h
new file mode 100644
index 0000000..26aee89
--- /dev/null
+++ b/src/kudu/collector/nodes_checker.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Thread;
+}  // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class ReporterBase;
+
+class NodesChecker : public RefCounted<NodesChecker> {
+ public:
+  explicit NodesChecker(scoped_refptr<ReporterBase> reporter);
+  ~NodesChecker();
+
+  Status Init();
+  Status Start();
+  void Shutdown();
+
+  std::string ToString() const;
+
+  std::vector<std::string> GetNodes();
+  std::string GetFirstMaster();
+
+ private:
+  friend class RefCounted<NodesChecker>;
+
+  FRIEND_TEST(TestNodesChecker, TestExtractServerHealthStatus);
+  FRIEND_TEST(TestNodesChecker, TestExtractTableHealthStatus);
+
+  Status StartNodesCheckerThread();
+  void NodesCheckerThread();
+
+  void UpdateAndCheckNodes();
+  Status UpdateNodes();
+  Status UpdateServers(const std::string& role);
+  Status CheckNodes() const;
+  Status ReportNodesMetrics(const std::string& data) const;
+
+  static cluster_summary::ServerHealth ExtractServerHealthStatus(const std::string& health);
+  static cluster_summary::HealthCheckResult ExtractTableHealthStatus(const std::string& health);
+
+  static const std::string kMaster;
+  static const std::string kTserver;
+
+  bool initialized_;
+
+  scoped_refptr<ReporterBase> reporter_;
+
+  CountDownLatch stop_background_threads_latch_;
+  scoped_refptr<Thread> nodes_checker_thread_;
+
+  mutable RWMutex nodes_lock_;
+  std::vector<std::string> tserver_http_addrs_;
+  std::vector<std::string> master_http_addrs_;
+
+  DISALLOW_COPY_AND_ASSIGN(NodesChecker);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/reporter_base.h b/src/kudu/collector/reporter_base.h
new file mode 100644
index 0000000..d03b80c
--- /dev/null
+++ b/src/kudu/collector/reporter_base.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/server/server_base.h"
+#include "kudu/tools/ksck_results.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+namespace collector {
+
+struct ItemBase : public RefCounted<ItemBase> {
+  virtual ~ItemBase() = default;
+
+ private:
+  friend class RefCounted<ItemBase>;
+};
+
+class ReporterBase : public RefCounted<ReporterBase> {
+ public:
+  virtual ~ReporterBase() = default;
+
+  virtual Status Init() = 0;
+  virtual Status Start() = 0;
+  virtual void Shutdown() = 0;
+
+  virtual std::string ToString() const = 0;
+
+  // TODO(yingchun) This function is not generic enough for base class.
+  virtual scoped_refptr<ItemBase> ConstructItem(std::string endpoint,
+                                                std::string metric,
+                                                std::string level,
+                                                uint64_t timestamp,
+                                                int64_t value,
+                                                std::string counter_type,
+                                                std::string extra_tags) = 0;
+  virtual Status PushItems(std::list<scoped_refptr<ItemBase>> items) = 0;
+
+ protected:
+  friend class RefCounted<ReporterBase>;
+};
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6aefb42..21de53d 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -61,6 +61,7 @@
 
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/common.h>
 
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 8e98c5d..7834ea4 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <stdlib.h>
-
 #include <algorithm>
 #include <cstdint>
 #include <functional>
diff --git a/src/kudu/util/jsonreader.h b/src/kudu/util/jsonreader.h
index 125b762..dcc6603 100644
--- a/src/kudu/util/jsonreader.h
+++ b/src/kudu/util/jsonreader.h
@@ -28,6 +28,10 @@
 
 namespace kudu {
 
+namespace collector {
+class MetricsCollector;
+}  // namespace collector
+
 // Wraps the JSON parsing functionality of rapidjson::Document.
 //
 // Unlike JsonWriter, this class does not hide rapidjson internals from
@@ -93,6 +97,8 @@ class JsonReader {
   const rapidjson::Value* root() const { return &document_; }
 
  private:
+  friend class collector::MetricsCollector;
+
   Status ExtractField(const rapidjson::Value* object,
                       const char* field,
                       const rapidjson::Value** result) const;