You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2020/08/15 14:59:35 UTC
[kudu] 07/23: [collector] Add CPP implemented collector
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to tag kudu-1.12.0-mdh1.0.0-4c2c075-centos-release
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d9b29d901bcbabd1210064f84d73ece9931baff2
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Wed Jun 19 16:20:06 2019 +0800
[collector] Add CPP implemented collector
Change-Id: I498347605a09e832d3398e76d9cefd3e52b1cfc6
---
CMakeLists.txt | 1 +
src/kudu/collector/CMakeLists.txt | 63 ++
src/kudu/collector/cluster_rebalancer-test.cc | 50 ++
src/kudu/collector/cluster_rebalancer.cc | 152 +++++
src/kudu/collector/cluster_rebalancer.h | 68 ++
src/kudu/collector/collector-test.cc | 44 ++
src/kudu/collector/collector.cc | 170 +++++
src/kudu/collector/collector.h | 74 +++
src/kudu/collector/collector_main.cc | 73 +++
src/kudu/collector/collector_util-test.cc | 33 +
src/kudu/collector/collector_util.cc | 46 ++
src/kudu/collector/collector_util.h | 32 +
src/kudu/collector/falcon_reporter-test.cc | 122 ++++
src/kudu/collector/falcon_reporter.cc | 255 ++++++++
src/kudu/collector/falcon_reporter.h | 108 ++++
src/kudu/collector/local_reporter.cc | 84 +++
src/kudu/collector/local_reporter.h | 58 ++
src/kudu/collector/metrics_collector-test.cc | 777 +++++++++++++++++++++++
src/kudu/collector/metrics_collector.cc | 852 ++++++++++++++++++++++++++
src/kudu/collector/metrics_collector.h | 205 +++++++
src/kudu/collector/nodes_checker-test.cc | 55 ++
src/kudu/collector/nodes_checker.cc | 358 +++++++++++
src/kudu/collector/nodes_checker.h | 90 +++
src/kudu/collector/reporter_base.h | 73 +++
src/kudu/master/catalog_manager.cc | 1 +
src/kudu/tools/tool_action_table.cc | 2 -
src/kudu/util/jsonreader.h | 6 +
27 files changed, 3850 insertions(+), 2 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d2bb441..1797da9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1423,6 +1423,7 @@ add_subdirectory(src/kudu/cfile)
add_subdirectory(src/kudu/client)
add_subdirectory(src/kudu/clock)
add_subdirectory(src/kudu/codegen)
+add_subdirectory(src/kudu/collector)
add_subdirectory(src/kudu/common)
add_subdirectory(src/kudu/consensus)
add_subdirectory(src/kudu/experiments)
diff --git a/src/kudu/collector/CMakeLists.txt b/src/kudu/collector/CMakeLists.txt
new file mode 100644
index 0000000..5bbb1cb
--- /dev/null
+++ b/src/kudu/collector/CMakeLists.txt
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#########################################
+# collector
+#########################################
+
+set(COLLECTOR_SRCS
+ cluster_rebalancer.cc
+ collector.cc
+ collector_util.cc
+ falcon_reporter.cc
+ local_reporter.cc
+ metrics_collector.cc
+ nodes_checker.cc)
+
+add_library(collector ${COLLECTOR_SRCS})
+target_link_libraries(collector
+ kudu_curl_util
+ kudu_tools_test_util
+ log
+ security
+ server_process)
+
+#########################################
+# kudu-collector
+#########################################
+
+add_executable(kudu-collector collector_main.cc)
+target_link_libraries(kudu-collector
+ ${SANITIZER_OPTIONS_OVERRIDE}
+ ${KRB5_REALM_OVERRIDE}
+ collector
+ ${KUDU_BASE_LIBS})
+
+option(KUDU_COLLECTOR_INSTALL "Whether to install the Kudu Collector executable" ON)
+if(KUDU_COLLECTOR_INSTALL)
+ install(TARGETS kudu-collector RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR})
+else()
+ message(STATUS "Skipping install rule for the Kudu Collector executable")
+endif()
+
+SET_KUDU_TEST_LINK_LIBS(collector)
+ADD_KUDU_TEST(collector-test)
+ADD_KUDU_TEST(collector_util-test)
+ADD_KUDU_TEST(cluster_rebalancer-test)
+ADD_KUDU_TEST(falcon_reporter-test)
+ADD_KUDU_TEST(metrics_collector-test)
+ADD_KUDU_TEST(nodes_checker-test)
diff --git a/src/kudu/collector/cluster_rebalancer-test.cc b/src/kudu/collector/cluster_rebalancer-test.cc
new file mode 100644
index 0000000..0a1e0a7
--- /dev/null
+++ b/src/kudu/collector/cluster_rebalancer-test.cc
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/cluster_rebalancer.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+namespace collector {
+
+TEST(TestClusterRebalancer, TestValidateHMTime) {
+ // 'time' in error format.
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:34:56").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("1:23").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:3").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime(":3").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12.34").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("-1:30").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("24:30").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:-1").IsInvalidArgument());
+ ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:60").IsInvalidArgument());
+
+ // 'time' in valid format.
+ ASSERT_OK(ClusterRebalancer::ValidateHMTime("12:34"));
+ ASSERT_OK(ClusterRebalancer::ValidateHMTime("00:00"));
+ ASSERT_OK(ClusterRebalancer::ValidateHMTime("00:59"));
+ ASSERT_OK(ClusterRebalancer::ValidateHMTime("23:00"));
+ ASSERT_OK(ClusterRebalancer::ValidateHMTime("23:59"));
+}
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/collector/cluster_rebalancer.cc b/src/kudu/collector/cluster_rebalancer.cc
new file mode 100644
index 0000000..0015544
--- /dev/null
+++ b/src/kudu/collector/cluster_rebalancer.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/cluster_rebalancer.h"
+
+#include <stdio.h>
+#include <time.h>
+
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/collector/collector_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_bool(auto_rebalance, true, "Whether to rebalance cluster automatically");
+DEFINE_string(rebalance_time, "00:00",
+ "Time to perform cluster rebalance, format in HH:MM");
+
+DECLARE_string(collector_cluster_name);
+DECLARE_string(collector_master_addrs);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+ClusterRebalancer::ClusterRebalancer()
+ : initialized_(false),
+ stop_background_threads_latch_(1) {
+}
+
+ClusterRebalancer::~ClusterRebalancer() {
+ Shutdown();
+}
+
+Status ClusterRebalancer::Init() {
+ CHECK(!initialized_);
+
+ RETURN_NOT_OK(ValidateHMTime(FLAGS_rebalance_time));
+
+ initialized_ = true;
+ return Status::OK();
+}
+
+Status ClusterRebalancer::Start() {
+ CHECK(initialized_);
+
+ if (!FLAGS_auto_rebalance) {
+ return Status::OK();
+ }
+
+ RETURN_NOT_OK(StartClusterRebalancerThread());
+
+ return Status::OK();
+}
+
+void ClusterRebalancer::Shutdown() {
+ if (initialized_) {
+ string name = ToString();
+ LOG(INFO) << name << " shutting down...";
+
+ stop_background_threads_latch_.CountDown();
+
+ if (cluster_rebalancer_thread_) {
+ cluster_rebalancer_thread_->Join();
+ }
+
+ LOG(INFO) << name << " shutdown complete.";
+ }
+}
+
+string ClusterRebalancer::ToString() const {
+ return "ClusterRebalancer";
+}
+
+Status ClusterRebalancer::StartClusterRebalancerThread() {
+ return Thread::Create("server", "cluster-rebalancer", &ClusterRebalancer::ClusterRebalancerThread,
+ this, &cluster_rebalancer_thread_);
+}
+
+void ClusterRebalancer::ClusterRebalancerThread() {
+ const MonoDelta kWait = MonoDelta::FromSeconds(60);
+ while (!RunOnceMode() && !stop_background_threads_latch_.WaitFor(kWait)) {
+ string dst;
+ StringAppendStrftime(&dst, "%H:%M", time(nullptr), true);
+ if (dst == FLAGS_rebalance_time) {
+ WARN_NOT_OK(RebalanceCluster(), "Unable to rebalance cluster");
+ }
+ }
+}
+
+Status ClusterRebalancer::RebalanceCluster() {
+ vector<string> args = {
+ "cluster",
+ "rebalance",
+ FLAGS_collector_master_addrs
+ };
+ string tool_stdout;
+ string tool_stderr;
+ RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+ Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+ LOG(INFO) << std::endl
+ << tool_stdout;
+ return Status::OK();
+}
+
+Status ClusterRebalancer::ValidateHMTime(const string& time) {
+ Status err = Status::InvalidArgument(
+ Substitute("Invalid time format '$0', should in format 'HH:MM'", time));
+ if (time.size() != 5) {
+ return err;
+ }
+
+ int hour, minute;
+ int count = sscanf(time.c_str(), "%d:%d", &hour, &minute);
+ if (count == 2 &&
+ 0 <= hour && hour < 24 &&
+ 0 <= minute && minute < 60) {
+ return Status::OK();
+ }
+
+ return err;
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/cluster_rebalancer.h b/src/kudu/collector/cluster_rebalancer.h
new file mode 100644
index 0000000..ed08736
--- /dev/null
+++ b/src/kudu/collector/cluster_rebalancer.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Thread;
+} // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class ClusterRebalancer : public RefCounted<ClusterRebalancer> {
+ public:
+ ClusterRebalancer();
+ ~ClusterRebalancer();
+
+ Status Init();
+ Status Start();
+ void Shutdown();
+
+ std::string ToString() const;
+
+ private:
+ friend class RefCounted<ClusterRebalancer>;
+
+ FRIEND_TEST(TestClusterRebalancer, TestValidateHMTime);
+
+ // Start thread to rebalance cluster.
+ Status StartClusterRebalancerThread();
+ void ClusterRebalancerThread();
+ static Status RebalanceCluster();
+
+ static Status ValidateHMTime(const std::string& time);
+
+ bool initialized_;
+
+ CountDownLatch stop_background_threads_latch_;
+
+ scoped_refptr<Thread> cluster_rebalancer_thread_;
+
+ DISALLOW_COPY_AND_ASSIGN(ClusterRebalancer);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector-test.cc b/src/kudu/collector/collector-test.cc
new file mode 100644
index 0000000..e8e1298
--- /dev/null
+++ b/src/kudu/collector/collector-test.cc
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/collector.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+namespace collector {
+
+TEST(TestCollector, TestValidateIntervalAndTimeout) {
+ // 'interval' in error range.
+ ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(9, 1).IsInvalidArgument());
+ ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(61, 1).IsInvalidArgument());
+
+ // 'timeout' in error range.
+ ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(10, 0).IsInvalidArgument());
+ ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(10, 10).IsInvalidArgument());
+
+ // Both 'interval' and 'timeout' are in valid range.
+ ASSERT_OK(Collector::ValidateIntervalAndTimeout(10, 9));
+ ASSERT_OK(Collector::ValidateIntervalAndTimeout(60, 9));
+ ASSERT_OK(Collector::ValidateIntervalAndTimeout(60, 59));
+}
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/collector/collector.cc b/src/kudu/collector/collector.cc
new file mode 100644
index 0000000..1c930e9
--- /dev/null
+++ b/src/kudu/collector/collector.cc
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/collector.h"
+
+#include <ostream>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/collector/cluster_rebalancer.h"
+#include "kudu/collector/falcon_reporter.h"
+#include "kudu/collector/local_reporter.h"
+#include "kudu/collector/metrics_collector.h"
+#include "kudu/collector/nodes_checker.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/init.h"
+#include "kudu/util/env.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_string(collector_cluster_name, "",
+ "Cluster name of this collector to operate");
+DEFINE_string(collector_master_addrs, "",
+ "Comma-separated list of Kudu master addresses where each address is of "
+ "form 'hostname:port");
+DEFINE_int32(collector_interval_sec, 60,
+ "Number of interval seconds to collect metrics");
+DEFINE_string(collector_report_method, "",
+ "Which monitor system the metrics reported to. Now supported system: falcon");
+DEFINE_int32(collector_timeout_sec, 10,
+ "Number of seconds to wait for a master, tserver, or CLI tool to return metrics");
+DEFINE_int32(collector_warn_threshold_ms, 1000,
+ "If a task takes more than this number of milliseconds, issue a warning with a "
+ "trace.");
+
+DECLARE_string(principal);
+DECLARE_string(keytab_file);
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+Collector::Collector()
+ : initialized_(false),
+ stop_background_threads_latch_(1) {
+}
+
+Collector::~Collector() {
+ Shutdown();
+}
+
+Status Collector::Init() {
+ CHECK(!initialized_);
+
+ RETURN_NOT_OK(ValidateIntervalAndTimeout(FLAGS_collector_interval_sec,
+ FLAGS_collector_timeout_sec));
+ RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
+
+ if (FLAGS_collector_report_method == "falcon") {
+ reporter_.reset(new FalconReporter());
+ } else if (FLAGS_collector_report_method == "local") {
+ reporter_.reset(new LocalReporter());
+ } else {
+ LOG(FATAL) << Substitute("Unsupported FLAGS_collector_report_method $0",
+ FLAGS_collector_report_method);
+ }
+ CHECK_OK(reporter_->Init());
+ nodes_checker_.reset(new NodesChecker(reporter_));
+ CHECK_OK(nodes_checker_->Init());
+ metrics_collector_.reset(new MetricsCollector(nodes_checker_, reporter_));
+ CHECK_OK(metrics_collector_->Init());
+ cluster_rebalancer_.reset(new ClusterRebalancer());
+ CHECK_OK(cluster_rebalancer_->Init());
+
+ initialized_ = true;
+ return Status::OK();
+}
+
+Status Collector::Start() {
+ CHECK(initialized_);
+
+ google::FlushLogFiles(google::INFO); // Flush the startup messages.
+
+ RETURN_NOT_OK(StartExcessLogFileDeleterThread());
+
+ reporter_->Start();
+ nodes_checker_->Start();
+ metrics_collector_->Start();
+ cluster_rebalancer_->Start();
+
+ return Status::OK();
+}
+
+void Collector::Shutdown() {
+ if (initialized_) {
+ string name = ToString();
+ LOG(INFO) << name << " shutting down...";
+
+ reporter_->Shutdown();
+ metrics_collector_->Shutdown();
+ nodes_checker_->Shutdown();
+ cluster_rebalancer_->Shutdown();
+
+ stop_background_threads_latch_.CountDown();
+
+ if (excess_log_deleter_thread_) {
+ excess_log_deleter_thread_->Join();
+ }
+
+ LOG(INFO) << name << " shutdown complete.";
+ }
+}
+
+string Collector::ToString() const {
+ return "Collector";
+}
+
+Status Collector::StartExcessLogFileDeleterThread() {
+ // Try synchronously deleting excess log files once at startup to make sure it
+ // works, then start a background thread to continue deleting them in the
+ // future.
+ if (!FLAGS_logtostderr) {
+ RETURN_NOT_OK_PREPEND(DeleteExcessLogFiles(Env::Default()),
+ "Unable to delete excess log files");
+ }
+ return Thread::Create("server", "excess-log-deleter", &Collector::ExcessLogFileDeleterThread,
+ this, &excess_log_deleter_thread_);
+}
+
+void Collector::ExcessLogFileDeleterThread() {
+ // How often to attempt to clean up excess glog files.
+ const MonoDelta kWait = MonoDelta::FromSeconds(60);
+ while (!stop_background_threads_latch_.WaitFor(kWait)) {
+ WARN_NOT_OK(DeleteExcessLogFiles(Env::Default()), "Unable to delete excess log files");
+ }
+}
+
+Status Collector::ValidateIntervalAndTimeout(int interval, int timeout) {
+ if (10 <= interval && interval <= 60 &&
+ 0 < timeout && timeout < interval) {
+ return Status::OK();
+ }
+
+ return Status::InvalidArgument(
+ Substitute("Invalid interval '$0'(should in range [10, 60]), "
+ "or invalid timeout '$1'(should in range (0, interval))", interval, timeout));
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector.h b/src/kudu/collector/collector.h
new file mode 100644
index 0000000..8e4e236
--- /dev/null
+++ b/src/kudu/collector/collector.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Thread;
+} // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class ClusterRebalancer;
+class MetricsCollector;
+class NodesChecker;
+class ReporterBase;
+
+class Collector {
+ public:
+ Collector();
+ ~Collector();
+
+ Status Init();
+ Status Start();
+ void Shutdown();
+
+ std::string ToString() const;
+
+ private:
+ FRIEND_TEST(TestCollector, TestValidateIntervalAndTimeout);
+
+ // Start thread to remove excess glog files.
+ Status StartExcessLogFileDeleterThread();
+ void ExcessLogFileDeleterThread();
+
+ static Status ValidateIntervalAndTimeout(int interval, int timeout);
+
+ bool initialized_;
+
+ scoped_refptr<ReporterBase> reporter_;
+ scoped_refptr<MetricsCollector> metrics_collector_;
+ scoped_refptr<NodesChecker> nodes_checker_;
+ scoped_refptr<ClusterRebalancer> cluster_rebalancer_;
+
+ CountDownLatch stop_background_threads_latch_;
+ scoped_refptr<Thread> excess_log_deleter_thread_;
+
+ DISALLOW_COPY_AND_ASSIGN(Collector);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector_main.cc b/src/kudu/collector/collector_main.cc
new file mode 100644
index 0000000..afc0768
--- /dev/null
+++ b/src/kudu/collector/collector_main.cc
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/collector/collector.h"
+#include "kudu/collector/collector_util.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/init.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/version_info.h"
+
+namespace kudu {
+namespace collector {
+
+static int CollectorMain(int argc, char** argv) {
+ InitKuduOrDie();
+
+ GFlagsMap default_flags = GetFlagsMap();
+
+ ParseCommandLineFlags(&argc, &argv, true);
+ if (argc != 1) {
+ std::cerr << "usage: " << argv[0] << std::endl;
+ return 1;
+ }
+ std::string nondefault_flags = GetNonDefaultFlags(default_flags);
+ InitGoogleLoggingSafe(argv[0]);
+
+ LOG(INFO) << "Collector non-default flags:\n"
+ << nondefault_flags << '\n'
+ << "Collector version:\n"
+ << VersionInfo::GetAllVersionInfo();
+
+ Collector collector;
+ LOG(INFO) << "Initializing collector...";
+ CHECK_OK(collector.Init());
+
+ LOG(INFO) << "Starting collector...";
+ CHECK_OK(collector.Start());
+
+ LOG(INFO) << "Collector successfully started.";
+ while (!RunOnceMode()) {
+ SleepFor(MonoDelta::FromSeconds(60));
+ }
+
+ return 0;
+}
+
+} // namespace collector
+} // namespace kudu
+
+int main(int argc, char** argv) {
+ return kudu::collector::CollectorMain(argc, argv);
+}
diff --git a/src/kudu/collector/collector_util-test.cc b/src/kudu/collector/collector_util-test.cc
new file mode 100644
index 0000000..8b29c56
--- /dev/null
+++ b/src/kudu/collector/collector_util-test.cc
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/collector_util.h"
+
+#include <gtest/gtest.h>
+
+namespace kudu {
+namespace collector {
+
+TEST(TestCollectorUtil, TestExtractHostName) {
+ ASSERT_EQ(ExtractHostName("1.2.3.4:5555"), "1.2.3.4");
+ ASSERT_EQ(ExtractHostName("host-name.bj:5555"), "host-name.bj");
+ ASSERT_EQ(ExtractHostName("1.2.3.4"), "1.2.3.4");
+ ASSERT_EQ(ExtractHostName("host-name.bj"), "host-name.bj");
+}
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/collector/collector_util.cc b/src/kudu/collector/collector_util.cc
new file mode 100644
index 0000000..aa79c40
--- /dev/null
+++ b/src/kudu/collector/collector_util.cc
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions for generating data for use by tools and tests.
+
+#include "kudu/collector/collector_util.h"
+
+#include <stddef.h>
+
+#include <gflags/gflags_declare.h>
+
+DECLARE_string(collector_report_method);
+
+using std::string;
+
+namespace kudu {
+namespace collector {
+
+string ExtractHostName(const string& url) {
+ size_t pos = url.find(':');
+ if (pos == string::npos) {
+ return url;
+ }
+ return url.substr(0, pos);
+}
+
+bool RunOnceMode() {
+ static bool run_once = (FLAGS_collector_report_method == "local");
+ return run_once;
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/collector_util.h b/src/kudu/collector/collector_util.h
new file mode 100644
index 0000000..f9badc8
--- /dev/null
+++ b/src/kudu/collector/collector_util.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions for generating data for use by tools and tests.
+
+#pragma once
+
+#include <string>
+
+namespace kudu {
+namespace collector {
+
+std::string ExtractHostName(const std::string& url);
+
+bool RunOnceMode();
+
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/falcon_reporter-test.cc b/src/kudu/collector/falcon_reporter-test.cc
new file mode 100644
index 0000000..85810c7
--- /dev/null
+++ b/src/kudu/collector/falcon_reporter-test.cc
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/falcon_reporter.h"
+
+#include <list>
+#include <string>
+#include <utility>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/test_macros.h"
+
+DECLARE_string(collector_cluster_name);
+DECLARE_int32(collector_falcon_metrics_version);
+DECLARE_int32(collector_interval_sec);
+
+using std::list;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+TEST(TestFalconReporter, TestSerializeItems) {
+ FLAGS_collector_interval_sec = 30;
+ FLAGS_collector_cluster_name = "test";
+ FLAGS_collector_falcon_metrics_version = 8;
+ scoped_refptr<FalconReporter> reporter(new FalconReporter());
+ list<scoped_refptr<ItemBase>> falcon_items;
+ string data;
+ ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data));
+ ASSERT_EQ(data, "");
+
+ falcon_items.emplace_back(reporter->ConstructItem(
+ "tserver1",
+ "scan_count",
+ "host",
+ 1234567890,
+ 12345,
+ "COUNTER",
+ ""));
+ ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data));
+ ASSERT_EQ(data, Substitute(
+ R"*([{"endpoint":"tserver1","metric":"scan_count","timestamp":1234567890,)*"
+ R"*("step":$0,"value":12345,"counterType":"COUNTER",)*"
+ R"*("tags":"service=kudu,cluster=$1,level=host,v=$2"}])*",
+ FLAGS_collector_interval_sec,
+ FLAGS_collector_cluster_name,
+ FLAGS_collector_falcon_metrics_version));
+
+ falcon_items.emplace_back(reporter->ConstructItem(
+ "table1",
+ "disk_size",
+ "table",
+ 1234567891,
+ 67890,
+ "GAUGE",
+ ""));
+ ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data));
+ ASSERT_EQ(data, Substitute(
+ R"*([{"endpoint":"tserver1","metric":"scan_count","timestamp":1234567890,)*"
+ R"*("step":$0,"value":12345,"counterType":"COUNTER",)*"
+ R"*("tags":"service=kudu,cluster=$1,level=host,v=$2"},)*"
+ R"*({"endpoint":"table1","metric":"disk_size","timestamp":1234567891,)*"
+ R"*("step":$0,"value":67890,"counterType":"GAUGE",)*"
+ R"*("tags":"service=kudu,cluster=$1,level=table,v=$2"}])*",
+ FLAGS_collector_interval_sec,
+ FLAGS_collector_cluster_name,
+ FLAGS_collector_falcon_metrics_version));
+}
+
+void GenerateItems(const scoped_refptr<FalconReporter>& reporter, int count) {
+ list<scoped_refptr<ItemBase>> items;
+ for (int i = 0; i < count; ++i) {
+ items.emplace_back(reporter->ConstructItem("endpoint", "metric", "level", 0, i, "GAUGE", ""));
+ }
+ reporter->PushItems(std::move(items));
+}
+
+TEST(TestFalconReporter, TestPushAndPopItems) {
+ scoped_refptr<FalconReporter> reporter(new FalconReporter());
+ ASSERT_FALSE(reporter->HasItems());
+ NO_FATALS(GenerateItems(reporter, 1));
+ ASSERT_TRUE(reporter->HasItems());
+ NO_FATALS(GenerateItems(reporter, 9));
+ ASSERT_TRUE(reporter->HasItems());
+
+ list<scoped_refptr<ItemBase>> falcon_items;
+ reporter->PopItems(&falcon_items);
+ ASSERT_FALSE(reporter->HasItems());
+ ASSERT_EQ(falcon_items.size(), 10);
+
+ NO_FATALS(GenerateItems(reporter, 5));
+ ASSERT_TRUE(reporter->HasItems());
+
+ list<scoped_refptr<ItemBase>> falcon_items2;
+ reporter->PopItems(&falcon_items2);
+ ASSERT_FALSE(reporter->HasItems());
+ ASSERT_EQ(falcon_items2.size(), 5);
+}
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/collector/falcon_reporter.cc b/src/kudu/collector/falcon_reporter.cc
new file mode 100644
index 0000000..e492fd7
--- /dev/null
+++ b/src/kudu/collector/falcon_reporter.cc
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/falcon_reporter.h"
+
+#include <kudu/util/curl_util.h>
+#include <stddef.h>
+
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+DEFINE_bool(collector_direct_push, false,
+ "Whether to push collected items to falcon agent directly, "
+ "otherwise items will be cached and then pushed to falcon "
+ "agent asynchronous");
+DEFINE_string(collector_falcon_agent, "http://127.0.0.1:1988/v1/push",
+ "The falcon agent URL to push metrics to");
+DEFINE_int32(collector_falcon_metrics_version, 4,
+ "Version of metrics pushed to falcon, it will be tagged in "
+ "'tag' section of an item");
+DEFINE_int32(collector_falcon_pusher_count, 4,
+ "Thread count to push collected items to falcon agent");
+DEFINE_int32(collector_report_batch_size, 1000,
+ "Count of items will be pushed to falcon agent by batch");
+DEFINE_int32(collector_push_timeout_ms, 20,
+ "Timeout for pushing items to falcon agent");
+
+DECLARE_string(collector_cluster_name);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+DECLARE_int32(collector_warn_threshold_ms);
+
+using std::list;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+FalconReporter::FalconReporter()
+ : initialized_(false),
+ stop_background_threads_latch_(1) {
+}
+
+FalconReporter::~FalconReporter() {
+ Shutdown();
+}
+
+Status FalconReporter::Init() {
+ CHECK(!initialized_);
+
+ // Simple test falcon agent.
+ EasyCurl curl;
+ faststring dst;
+ RETURN_NOT_OK(curl.PostToURL(FLAGS_collector_falcon_agent, "", &dst));
+
+ initialized_ = true;
+ return Status::OK();
+}
+
+Status FalconReporter::Start() {
+ CHECK(initialized_);
+
+ if (!FLAGS_collector_direct_push) {
+ RETURN_NOT_OK(StartFalconPusherThreadPool());
+ }
+
+ return Status::OK();
+}
+
+void FalconReporter::Shutdown() {
+ if (initialized_) {
+ string name = ToString();
+ LOG(INFO) << name << " shutting down...";
+
+ stop_background_threads_latch_.CountDown();
+
+ pusher_thread_pool_->Wait();
+
+ LOG(INFO) << name << " shutdown complete.";
+ }
+}
+
+string FalconReporter::ToString() const {
+ return "FalconReporter";
+}
+
+scoped_refptr<ItemBase> FalconReporter::ConstructItem(string endpoint,
+ string metric,
+ string level,
+ uint64_t timestamp,
+ int64_t value,
+ string counter_type,
+ string extra_tags) {
+ scoped_refptr<ItemBase> tmp(new FalconItem(std::move(endpoint),
+ std::move(metric),
+ Substitute("service=kudu,cluster=$0,level=$1,v=$2$3",
+ FLAGS_collector_cluster_name, level,
+ FLAGS_collector_falcon_metrics_version,
+ extra_tags.empty() ? "" : "," + extra_tags),
+ timestamp,
+ FLAGS_collector_interval_sec,
+ value,
+ std::move(counter_type)));
+ return tmp;
+}
+
+Status FalconReporter::PushItems(list<scoped_refptr<ItemBase>> items) {
+ if (FLAGS_collector_direct_push) {
+ RETURN_NOT_OK(PushToAgent(std::move(items)));
+ } else {
+ std::lock_guard<RWMutex> l(items_lock_);
+ buffer_items_.splice(buffer_items_.end(), std::move(items));
+ }
+ return Status::OK();
+}
+
+Status FalconReporter::StartFalconPusherThreadPool() {
+ RETURN_NOT_OK(ThreadPoolBuilder("falcon-pusher")
+ .set_min_threads(FLAGS_collector_falcon_pusher_count)
+ .set_max_threads(FLAGS_collector_falcon_pusher_count)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(1))
+ .Build(&pusher_thread_pool_));
+ for (int i = 0; i < FLAGS_collector_falcon_pusher_count; ++i) {
+ RETURN_NOT_OK(pusher_thread_pool_->SubmitFunc(std::bind(&FalconReporter::FalconPusher,
+ this)));
+ }
+
+ return Status::OK();
+}
+
+void FalconReporter::FalconPusher() {
+ while (HasItems() || !stop_background_threads_latch_.WaitFor(MonoDelta::FromSeconds(1))) {
+ ReportItems();
+ }
+}
+
+void FalconReporter::ReportItems() {
+ MonoTime start(MonoTime::Now());
+ scoped_refptr<Trace> trace(new Trace);
+ ADOPT_TRACE(trace.get());
+ TRACE_EVENT0("collector", "FalconReporter::ReportItems");
+ TRACE("init");
+
+ list<scoped_refptr<ItemBase>> falcon_items;
+ PopItems(&falcon_items);
+ WARN_NOT_OK(PushToAgent(std::move(falcon_items)), "PushToAgent failed");
+ int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+ if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+ if (Trace::CurrentTrace()) {
+ LOG(WARNING) << "Trace:" << std::endl
+ << Trace::CurrentTrace()->DumpToString();
+ }
+ }
+}
+
+bool FalconReporter::HasItems() const {
+ std::lock_guard<RWMutex> l(items_lock_);
+ return !buffer_items_.empty();
+}
+
+void FalconReporter::PopItems(list<scoped_refptr<ItemBase>>* falcon_items) {
+ int items_left = 0;
+ CHECK(falcon_items);
+ {
+ std::lock_guard<RWMutex> l(items_lock_);
+ auto end_item = buffer_items_.begin();
+ std::advance(end_item, std::min(buffer_items_.size(),
+ static_cast<size_t>(FLAGS_collector_report_batch_size)));
+ falcon_items->splice(falcon_items->end(), buffer_items_, buffer_items_.begin(), end_item);
+ items_left = buffer_items_.size();
+ }
+ if (items_left > 1000000) {
+ LOG(INFO) << "Items left " << items_left << std::endl;
+ }
+ TRACE(Substitute("Pop items, count $0", falcon_items->size()));
+}
+
+Status FalconReporter::PushToAgent(list<scoped_refptr<ItemBase>> falcon_items) {
+ string data;
+ RETURN_NOT_OK(SerializeItems(std::move(falcon_items), &data));
+
+ EasyCurl curl;
+ faststring dst;
+ curl.set_timeout(MonoDelta::FromMilliseconds(FLAGS_collector_push_timeout_ms));
+ RETURN_NOT_OK(curl.PostToURL(FLAGS_collector_falcon_agent, data, &dst));
+ TRACE(Substitute("Pushed items to agent, size $0", data.size()));
+ return Status::OK();
+}
+
+Status FalconReporter::SerializeItems(list<scoped_refptr<ItemBase>> items, string* data) {
+ CHECK(data);
+ if (items.empty()) {
+ return Status::OK();
+ }
+ std::ostringstream str;
+ JsonWriter jw(&str, JsonWriter::COMPACT);
+ jw.StartArray();
+ for (const auto& item : items) {
+ scoped_refptr<FalconItem> falcon_item = dynamic_cast<FalconItem*>(item.get());
+ jw.StartObject();
+ jw.String("endpoint");
+ jw.String(falcon_item->endpoint);
+ jw.String("metric");
+ jw.String(falcon_item->metric);
+ jw.String("timestamp");
+ jw.Uint64(falcon_item->timestamp);
+ jw.String("step");
+ jw.Int(falcon_item->step);
+ jw.String("value");
+ jw.Int64(falcon_item->value);
+ jw.String("counterType");
+ jw.String(falcon_item->counter_type);
+ jw.String("tags");
+ jw.String(falcon_item->tags);
+ jw.EndObject();
+ }
+ jw.EndArray();
+ *data = str.str();
+ TRACE(Substitute("SerializeItems done, count $0, size $1", items.size(), data->size()));
+ return Status::OK();
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/falcon_reporter.h b/src/kudu/collector/falcon_reporter.h
new file mode 100644
index 0000000..3b3cde5
--- /dev/null
+++ b/src/kudu/collector/falcon_reporter.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class ThreadPool;
+} // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+// Open-Falcon is a distributed and high-performance monitoring system,
+// see more details http://open-falcon.com
+struct FalconItem : public ItemBase {
+ FalconItem(std::string ep, std::string m, std::string t,
+ uint64_t ts, int32_t s, int64_t v, std::string ct)
+ : endpoint(std::move(ep)),
+ metric(std::move(m)),
+ tags(std::move(t)),
+ timestamp(ts),
+ step(s),
+ value(v),
+ counter_type(std::move(ct)) {
+ }
+ ~FalconItem() override = default;
+
+ std::string endpoint;
+ std::string metric;
+ std::string tags;
+ uint64_t timestamp;
+ int32_t step;
+ int64_t value;
+ std::string counter_type;
+};
+
+class FalconReporter : public ReporterBase {
+ public:
+ FalconReporter();
+ ~FalconReporter() override;
+
+ Status Init() override;
+ Status Start() override;
+ void Shutdown() override;
+
+ std::string ToString() const override;
+
+ scoped_refptr<ItemBase> ConstructItem(std::string endpoint,
+ std::string metric,
+ std::string level,
+ uint64_t timestamp,
+ int64_t value,
+ std::string counter_type,
+ std::string extra_tags) override;
+
+ Status PushItems(std::list<scoped_refptr<ItemBase>> items) override;
+
+ private:
+ FRIEND_TEST(TestFalconReporter, TestSerializeItems);
+ FRIEND_TEST(TestFalconReporter, TestPushAndPopItems);
+
+ Status StartFalconPusherThreadPool();
+ void FalconPusher();
+
+ bool HasItems() const;
+ void ReportItems();
+ void PopItems(std::list<scoped_refptr<ItemBase>>* falcon_items);
+ static Status PushToAgent(std::list<scoped_refptr<ItemBase>> falcon_items);
+ static Status SerializeItems(std::list<scoped_refptr<ItemBase>> items, std::string* data);
+
+ bool initialized_;
+
+ CountDownLatch stop_background_threads_latch_;
+ std::unique_ptr<ThreadPool> pusher_thread_pool_;
+
+ mutable RWMutex items_lock_;
+ std::list<scoped_refptr<ItemBase>> buffer_items_;
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/local_reporter.cc b/src/kudu/collector/local_reporter.cc
new file mode 100644
index 0000000..04110bb
--- /dev/null
+++ b/src/kudu/collector/local_reporter.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/local_reporter.h"
+
+#include <iostream>
+
+#include <glog/logging.h>
+
+#include "kudu/util/status.h"
+
+using std::list;
+using std::string;
+
+namespace kudu {
+namespace collector {
+
+LocalReporter::LocalReporter()
+ : initialized_(false) {
+}
+
+LocalReporter::~LocalReporter() {
+ Shutdown();
+}
+
+Status LocalReporter::Init() {
+ CHECK(!initialized_);
+
+ initialized_ = true;
+ return Status::OK();
+}
+
+Status LocalReporter::Start() {
+ CHECK(initialized_);
+
+ return Status::OK();
+}
+
+void LocalReporter::Shutdown() {
+ if (initialized_) {
+ string name = ToString();
+ LOG(INFO) << name << " shutting down...";
+
+ LOG(INFO) << name << " shutdown complete.";
+ }
+}
+
+string LocalReporter::ToString() const {
+ return "LocalReporter";
+}
+
+scoped_refptr<ItemBase> LocalReporter::ConstructItem(string endpoint,
+ string metric,
+ string level,
+ uint64_t /*timestamp*/,
+ int64_t value,
+ string /*counter_type*/,
+ string extra_tags) {
+ MutexLock l(output_lock_);
+ std::cout << level << " " << metric << " " << endpoint << " "
+ << (extra_tags.empty() ? "" : extra_tags + " ") << value << std::endl;
+ return nullptr;
+}
+
+Status LocalReporter::PushItems(list<scoped_refptr<ItemBase>> /*items*/) {
+ return Status::OK();
+}
+
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/local_reporter.h b/src/kudu/collector/local_reporter.h
new file mode 100644
index 0000000..d796b3b
--- /dev/null
+++ b/src/kudu/collector/local_reporter.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <string>
+
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace collector {
+
+class LocalReporter : public ReporterBase {
+ public:
+ LocalReporter();
+ ~LocalReporter() override;
+
+ Status Init() override;
+ Status Start() override;
+ void Shutdown() override;
+
+ std::string ToString() const override;
+
+ scoped_refptr<ItemBase> ConstructItem(std::string endpoint,
+ std::string metric,
+ std::string level,
+ uint64_t timestamp,
+ int64_t value,
+ std::string counter_type,
+ std::string extra_tags) override;
+
+ Status PushItems(std::list<scoped_refptr<ItemBase>> items) override;
+
+ private:
+ bool initialized_;
+ Mutex output_lock_;
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/metrics_collector-test.cc b/src/kudu/collector/metrics_collector-test.cc
new file mode 100644
index 0000000..4a3e050
--- /dev/null
+++ b/src/kudu/collector/metrics_collector-test.cc
@@ -0,0 +1,777 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/metrics_collector.h"
+
+#include <stdint.h>
+
+#include <map>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/collector/local_reporter.h"
+#include "kudu/collector/nodes_checker.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+DECLARE_bool(collector_request_merged_metrics);
+DECLARE_string(collector_attributes);
+DECLARE_string(collector_cluster_level_metrics);
+DECLARE_string(collector_metrics);
+DECLARE_string(collector_table_names);
+DECLARE_string(collector_metrics_types_for_test);
+
+using std::map;
+using std::set;
+using std::string;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace collector {
+
+scoped_refptr<MetricsCollector> BuildCollector() {
+ scoped_refptr<ReporterBase> reporter(new LocalReporter());
+ scoped_refptr<NodesChecker> nodes_checker(new NodesChecker(reporter));
+ return new MetricsCollector(nodes_checker, reporter);
+}
+
+TEST(TestMetricsCollector, TestConvertStateToInt) {
+ int64_t result = 1;
+ ASSERT_OK(MetricsCollector::ConvertStateToInt("", &result));
+ ASSERT_EQ(result, 0);
+ ASSERT_OK(MetricsCollector::ConvertStateToInt("STOPPED", &result));
+ ASSERT_EQ(result, 0);
+ ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGSTOPPED", &result));
+ ASSERT_EQ(result, 0);
+ ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGBOOTSTRAPPING", &result));
+ ASSERT_EQ(result, 0);
+ ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNING", &result));
+ ASSERT_EQ(result, 1);
+ ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGRUNNING", &result));
+ ASSERT_EQ(result, 1);
+}
+
+TEST(TestMetricsCollector, TestGetHistValue) {
+ {
+ vector<MetricsCollector::SimpleHistogram> hist_values({{10, 100}});
+ ASSERT_EQ(MetricsCollector::GetHistValue(hist_values), 100);
+ }
+ {
+ vector<MetricsCollector::SimpleHistogram> hist_values({{10, 100},
+ {20, 200}});
+ ASSERT_EQ(MetricsCollector::GetHistValue(hist_values), 167);
+ }
+}
+
+TEST(TestMetricsCollector, TestMergeToTableLevelMetrics) {
+ // Merge empty metrics.
+ {
+ vector<MetricsCollector::TablesMetrics> hosts_tables_metrics;
+ vector<MetricsCollector::TablesHistMetrics> hosts_tables_hist_metrics;
+ MetricsCollector::TablesMetrics tables_metrics;
+ MetricsCollector::TablesHistMetrics tables_hist_metrics;
+ ASSERT_OK(MetricsCollector::MergeToTableLevelMetrics(
+ hosts_tables_metrics, hosts_tables_hist_metrics,
+ &tables_metrics, &tables_hist_metrics));
+ ASSERT_TRUE(tables_metrics.empty());
+ ASSERT_TRUE(tables_hist_metrics.empty());
+ }
+ // Merge multi metrics.
+ {
+ vector<MetricsCollector::TablesMetrics> hosts_tables_metrics{
+ { // host-1
+ {
+ "table1",
+ {
+ {"metric1", 1},
+ {"metric2", 2}
+ }
+ },
+ {
+ "table2",
+ {
+ {"metric1", 100},
+ {"metric3", 200}
+ }
+ }
+ },
+ { // host-2
+ {
+ "table1",
+ {
+ {"metric1", 100},
+ {"metric2", 200}
+ }
+ },
+ {
+ "table2",
+ {
+ {"metric1", 1},
+ {"metric2", 2}
+ }
+ },
+ {
+ "table3",
+ {
+ {"metric1", 1},
+ {"metric2", 2}
+ }
+ }
+ }
+ };
+ vector<MetricsCollector::TablesHistMetrics> hosts_tables_hist_metrics{
+ { // host-1
+ {
+ "table1",
+ {
+ {
+ "metric3",
+ {
+ {10, 100},
+ {20, 200}
+ }
+ },
+ {
+ "metric4",
+ {
+ {30, 300},
+ {40, 400}
+ }
+ }
+ }
+ },
+ {
+ "table2",
+ {
+ {
+ "metric3",
+ {
+ {10, 200},
+ {20, 300}
+ }
+ },
+ {
+ "metric4",
+ {
+ {40, 300},
+ {50, 400}
+ }
+ }
+ }
+ }
+ },
+ { // host-2
+ {
+ "table1",
+ {
+ {
+ "metric3",
+ {
+ {10, 100},
+ {20, 200}
+ }
+ },
+ {
+ "metric4",
+ {
+ {30, 300},
+ {40, 400}
+ }
+ }
+ }
+ },
+ {
+ "table2",
+ {
+ {
+ "metric3",
+ {
+ {10, 200},
+ {20, 300}
+ }
+ },
+ {
+ "metric4",
+ {
+ {40, 300},
+ {50, 400}
+ }
+ }
+ }
+ },
+ {
+ "table3",
+ {
+ {
+ "metric3",
+ {
+ {10, 200},
+ {20, 300}
+ }
+ },
+ {
+ "metric4",
+ {
+ {40, 300},
+ {50, 400}
+ }
+ }
+ }
+ }
+ }
+ };
+ MetricsCollector::TablesMetrics tables_metrics;
+ MetricsCollector::TablesHistMetrics tables_hist_metrics;
+ ASSERT_OK(MetricsCollector::MergeToTableLevelMetrics(
+ hosts_tables_metrics, hosts_tables_hist_metrics,
+ &tables_metrics, &tables_hist_metrics));
+ ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({
+ {
+ "table1",
+ {
+ {"metric1", 101},
+ {"metric2", 202}
+ }
+ },
+ {
+ "table2",
+ {
+ {"metric1", 101},
+ {"metric2", 2},
+ {"metric3", 200},
+ }
+ },
+ {
+ "table3",
+ {
+ {"metric1", 1},
+ {"metric2", 2}
+ }
+ }
+ }));
+ ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({
+ {
+ "table1",
+ {
+ {
+ "metric3",
+ {
+ {10, 100},
+ {20, 200},
+ {10, 100},
+ {20, 200}
+ }
+ },
+ {
+ "metric4",
+ {
+ {30, 300},
+ {40, 400},
+ {30, 300},
+ {40, 400}
+ }
+ }
+ }
+ },
+ {
+ "table2",
+ {
+ {
+ "metric3",
+ {
+ {10, 200},
+ {20, 300},
+ {10, 200},
+ {20, 300}
+ }
+ },
+ {
+ "metric4",
+ {
+ {40, 300},
+ {50, 400},
+ {40, 300},
+ {50, 400}
+ }
+ }
+ }
+ },
+ {
+ "table3",
+ {
+ {
+ "metric3",
+ {
+ {10, 200},
+ {20, 300}
+ }
+ },
+ {
+ "metric4",
+ {
+ {40, 300},
+ {50, 400}
+ }
+ }
+ }
+ }
+ }));
+ }
+}
+
+TEST(TestMetricsCollector, TestMergeToClusterLevelMetrics) {
+ // Merge empty metrics.
+ {
+ MetricsCollector::TablesMetrics tables_metrics;
+ MetricsCollector::TablesHistMetrics tables_hist_metrics;
+ MetricsCollector::Metrics cluster_metrics;
+ ASSERT_OK(MetricsCollector::MergeToClusterLevelMetrics(tables_metrics, tables_hist_metrics,
+ &cluster_metrics));
+ ASSERT_TRUE(cluster_metrics.empty());
+ }
+ // Merge multi metrics.
+ {
+ MetricsCollector::TablesMetrics tables_metrics(
+ {
+ {
+ "table1",
+ {
+ {"metric1", 100}
+ }
+ },
+ {
+ "table2",
+ {
+ {"metric1", 10},
+ {"metric2", 20}
+ }
+ },
+ {
+ "table3",
+ {
+ {"metric1", 1},
+ {"metric2", 2},
+ {"metric3", 3}
+ }
+ }
+ }
+ );
+ MetricsCollector::TablesHistMetrics tables_hist_metrics; // TODO(yingchun) not used now.
+ MetricsCollector::Metrics cluster_metrics({{"metric2", 0}});
+ ASSERT_OK(MetricsCollector::MergeToClusterLevelMetrics(tables_metrics, tables_hist_metrics,
+ &cluster_metrics));
+ ASSERT_EQ(cluster_metrics, MetricsCollector::Metrics({
+ {
+ {"metric2", 22}
+ }
+ }));
+ }
+}
+
+TEST(TestMetricsCollector, TestParseMetrics) {
+ // Check ParseServerMetrics and ParseTabletMetrics.
+ {
+ string data;
+ JsonReader r(data);
+ const rapidjson::Value entity;
+ ASSERT_TRUE(MetricsCollector::ParseServerMetrics(r, &entity).IsNotSupported());
+ ASSERT_TRUE(MetricsCollector::ParseTabletMetrics(r, &entity).IsNotSupported());
+ }
+ // Check ParseTableMetrics.
+ {
+ auto collector = BuildCollector();
+ collector->metric_types_by_entity_type_["tablet"] = {
+ {"test_metric", "COUNTER"},
+ {"metric_counter1", "COUNTER"},
+ {"metric_counter2", "COUNTER"},
+ {"metric_histogram1", "HISTOGRAM"},
+ {"metric_histogram2", "HISTOGRAM"}
+ };
+ string data(
+ R"*([ )*"
+ R"*( { )*"
+ R"*( "type": "server", )*"
+ R"*( "id": "server1", )*"
+ R"*( "attributes": { )*"
+ R"*( "attrA": "val1", )*"
+ R"*( "attrB": "val2" )*"
+ R"*( }, )*"
+ R"*( "metrics": [ )*"
+ R"*( { )*"
+ R"*( "name": "test_metric", )*"
+ R"*( "value": 123 )*"
+ R"*( } )*"
+ R"*( ] )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "type": "tablet", )*"
+ R"*( "id": "tablet1", )*"
+ R"*( "attributes": { )*"
+ R"*( "attr1": "val1", )*"
+ R"*( "attr2": "val2" )*"
+ R"*( }, )*"
+ R"*( "metrics": [ )*"
+ R"*( { )*"
+ R"*( "name": "test_metric", )*"
+ R"*( "value": 321 )*"
+ R"*( } )*"
+ R"*( ] )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "type": "table", )*"
+ R"*( "id": "table1", )*"
+ R"*( "attributes": { )*"
+ R"*( "attr1": "val2", )*"
+ R"*( "attr2": "val3" )*"
+ R"*( }, )*"
+ R"*( "metrics": [ )*"
+ R"*( { )*"
+ R"*( "name": "metric_counter1", )*"
+ R"*( "value": 10 )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "metric_counter2", )*"
+ R"*( "value": 20 )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "metric_histogram1", )*"
+ R"*( "total_count": 17, )*"
+ R"*( "min": 6, )*"
+ R"*( "mean": 47.8235, )*"
+ R"*( "percentile_75": 62, )*"
+ R"*( "percentile_95": 72, )*"
+ R"*( "percentile_99": 73, )*"
+ R"*( "percentile_99_9": 73, )*"
+ R"*( "percentile_99_99": 73, )*"
+ R"*( "max": 73, )*"
+ R"*( "total_sum": 813 )*"
+ R"*( } )*"
+ R"*( ] )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "type": "table", )*"
+ R"*( "id": "table2", )*"
+ R"*( "attributes": { )*"
+ R"*( "attr1": "val3", )*"
+ R"*( "attr2": "val2" )*"
+ R"*( }, )*"
+ R"*( "metrics": [ )*"
+ R"*( { )*"
+ R"*( "name": "metric_counter1", )*"
+ R"*( "value": 100 )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "metric_histogram1", )*"
+ R"*( "total_count": 170, )*"
+ R"*( "min": 60, )*"
+ R"*( "mean": 478.235, )*"
+ R"*( "percentile_75": 620, )*"
+ R"*( "percentile_95": 720, )*"
+ R"*( "percentile_99": 730, )*"
+ R"*( "percentile_99_9": 735, )*"
+ R"*( "percentile_99_99": 735, )*"
+ R"*( "max": 735, )*"
+ R"*( "total_sum": 8130 )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "metric_histogram2", )*"
+ R"*( "total_count": 34, )*"
+ R"*( "min": 6, )*"
+ R"*( "mean": 47.8235, )*"
+ R"*( "percentile_75": 62, )*"
+ R"*( "percentile_95": 72, )*"
+ R"*( "percentile_99": 72, )*"
+ R"*( "percentile_99_9": 73, )*"
+ R"*( "percentile_99_99": 73, )*"
+ R"*( "max": 73, )*"
+ R"*( "total_sum": 813 )*"
+ R"*( } )*"
+ R"*( ] )*"
+ R"*( } )*"
+ R"*(] )*");
+
+ // Attribute filter is empty.
+ {
+ MetricsCollector::TablesMetrics tables_metrics;
+ MetricsCollector::TablesHistMetrics tables_hist_metrics;
+ MetricsCollector::Metrics host_metrics;
+ MetricsCollector::HistMetrics host_hist_metrics;
+ ASSERT_OK(collector->ParseMetrics(data,
+ &tables_metrics, &host_metrics,
+ &tables_hist_metrics, &host_hist_metrics));
+ ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({
+ {
+ "table1",
+ {
+ {"metric_counter1", 10},
+ {"metric_counter2", 20},
+ }
+ },
+ {
+ "table2",
+ {
+ {"metric_counter1", 100}
+ }
+ }
+ }));
+ ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({
+ {
+ "table1",
+ {
+ {
+ "metric_histogram1_percentile_99",
+ {
+ {17, 73}
+ }
+ }
+ }
+ },
+ {
+ "table2",
+ {
+ {
+ "metric_histogram1_percentile_99",
+ {
+ {170, 730}
+ }
+ },
+ {
+ "metric_histogram2_percentile_99",
+ {
+ {34, 72}
+ }
+ }
+ }
+ }
+ }));
+ ASSERT_EQ(host_metrics, MetricsCollector::Metrics({
+ {"metric_counter1", 110},
+ {"metric_counter2", 20}
+ }));
+ ASSERT_EQ(host_hist_metrics, MetricsCollector::HistMetrics({
+ {
+ "metric_histogram1_percentile_99",
+ {
+ {17, 73},
+ {170, 730}
+ }
+ },
+ {
+ "metric_histogram2_percentile_99",
+ {
+ {34, 72}
+ }
+ }
+ }));
+ }
+
+ // Attribute filter is not empty.
+ {
+ collector->attributes_filter_ = {{"attr1", {"val1", "val2"}}};
+
+ MetricsCollector::TablesMetrics tables_metrics;
+ MetricsCollector::TablesHistMetrics tables_hist_metrics;
+ MetricsCollector::Metrics host_metrics;
+ MetricsCollector::HistMetrics host_hist_metrics;
+ ASSERT_OK(collector->ParseMetrics(data,
+ &tables_metrics, &host_metrics,
+ &tables_hist_metrics, &host_hist_metrics));
+ ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({
+ {
+ "table1",
+ {
+ {"metric_counter1", 10},
+ {"metric_counter2", 20},
+ }
+ }
+ }));
+ ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({
+ {
+ "table1",
+ {
+ {
+ "metric_histogram1_percentile_99",
+ {
+ {17, 73}
+ }
+ }
+ }
+ }
+ }));
+ ASSERT_EQ(host_metrics, MetricsCollector::Metrics({
+ {"metric_counter1", 10},
+ {"metric_counter2", 20}
+ }));
+ ASSERT_EQ(host_hist_metrics, MetricsCollector::HistMetrics({
+ {
+ "metric_histogram1_percentile_99",
+ {
+ {17, 73},
+ }
+ }
+ }));
+ }
+ }
+}
+
+TEST(TestMetricsCollector, TestInitMetrics) {
+ FLAGS_collector_metrics_types_for_test =
+ R"*([ )*"
+ R"*( { )*"
+ R"*( "type": "tablet", )*"
+ R"*( "id": "table1", )*"
+ R"*( "metrics": [ )*"
+ R"*( { )*"
+ R"*( "name": "counter_metric1", )*"
+ R"*( "type": "counter" )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "histogram_metric1", )*"
+ R"*( "type": "histogram" )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "gauge_metric1", )*"
+ R"*( "type": "gauge" )*"
+ R"*( } )*"
+ R"*( ] )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "type": "tablet", )*"
+ R"*( "id": "table2", )*"
+ R"*( "metrics": [ )*"
+ R"*( { )*"
+ R"*( "name": "counter_metric1", )*"
+ R"*( "type": "counter" )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "histogram_metric1", )*"
+ R"*( "type": "histogram" )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "gauge_metric1", )*"
+ R"*( "type": "gauge" )*"
+ R"*( } )*"
+ R"*( ] )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "type": "server", )*"
+ R"*( "metrics": [ )*"
+ R"*( { )*"
+ R"*( "name": "counter_metric2", )*"
+ R"*( "type": "counter" )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "histogram_metric2", )*"
+ R"*( "type": "histogram" )*"
+ R"*( }, )*"
+ R"*( { )*"
+ R"*( "name": "gauge_metric2", )*"
+ R"*( "type": "gauge" )*"
+ R"*( } )*"
+ R"*( ] )*"
+ R"*( } )*"
+ R"*(] )*";
+ auto collector = BuildCollector();
+ ASSERT_OK(collector->InitMetrics());
+ map<string, MetricsCollector::MetricTypes> expect_metric_types({
+ {
+ "tablet",
+ {
+ {"counter_metric1", "COUNTER"},
+ {"histogram_metric1", "HISTOGRAM"},
+ {"gauge_metric1", "GAUGE"},
+ }
+ },
+ {
+ "server",
+ {
+ {"counter_metric2", "COUNTER"},
+ {"histogram_metric2", "HISTOGRAM"},
+ {"gauge_metric2", "GAUGE"},
+ }
+ }
+ });
+ ASSERT_EQ(collector->metric_types_by_entity_type_, expect_metric_types);
+}
+
+TEST(TestMetricsCollector, TestInitFilters) {
+ FLAGS_collector_attributes = "attr1:val1,val2;attr2:val1";
+ auto collector = BuildCollector();
+ ASSERT_OK(collector->InitFilters());
+ unordered_map<string, set<string>> expect_attributes_filter({
+ {
+ "attr1",
+ {"val1", "val2"}
+ },
+ {
+ "attr2",
+ {"val1"}
+ }
+ });
+ ASSERT_EQ(collector->attributes_filter_, expect_attributes_filter);
+}
+
+#define CHECK_URL_PARAMETERS(metrics, request_merged, attributes, table_names, expect_url) \
+do { \
+ FLAGS_collector_metrics = metrics; \
+ FLAGS_collector_request_merged_metrics = request_merged; \
+ FLAGS_collector_attributes = attributes; \
+ FLAGS_collector_table_names = table_names; \
+ auto collector = BuildCollector(); \
+ ASSERT_OK(collector->InitFilters()); \
+ ASSERT_OK(collector->InitMetricsUrlParameters()); \
+ ASSERT_EQ(collector->metric_url_parameters_, expect_url); \
+} while (false)
+
+TEST(TestMetricsCollector, TestInitMetricsUrlParameters) {
+ CHECK_URL_PARAMETERS("", true, "", "",
+ "/metrics?compact=1&origin=false&merge=true");
+ CHECK_URL_PARAMETERS("m1,m2,m3", true, "", "",
+ "/metrics?compact=1&metrics=m1,m2,m3&origin=false&merge=true");
+ // TODO(yingchun): now FLAGS_collector_request_merged_metrics must be true
+ //CHECK_URL_PARAMETERS("", false, "", "",
+ // "/metrics?compact=1");
+ CHECK_URL_PARAMETERS("", true, "attr1:a1,a2;attr2:a3", "",
+ "/metrics?compact=1&origin=false&merge=true&attributes=attr2,a3,attr1,a1,attr1,a2,");
+ CHECK_URL_PARAMETERS("", true, "", "t1,t2,t3",
+ "/metrics?compact=1&origin=false&merge=true&table_names=t1,t2,t3");
+}
+
+TEST(TestMetricsCollector, TestInitClusterLevelMetrics) {
+ FLAGS_collector_cluster_level_metrics = "m1,m2,m3";
+ auto collector = BuildCollector();
+ ASSERT_OK(collector->InitClusterLevelMetrics());
+ MetricsCollector::Metrics cluster_metrics({
+ {"m1", 0},
+ {"m2", 0},
+ {"m3", 0},
+ });
+ ASSERT_EQ(collector->cluster_metrics_, cluster_metrics);
+}
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/collector/metrics_collector.cc b/src/kudu/collector/metrics_collector.cc
new file mode 100644
index 0000000..e5adfcb
--- /dev/null
+++ b/src/kudu/collector/metrics_collector.cc
@@ -0,0 +1,852 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/metrics_collector.h"
+
+#include <string.h>
+
+#include <cmath>
+#include <functional>
+#include <list>
+#include <ostream>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <rapidjson/rapidjson.h>
+
+#include "kudu/collector/collector_util.h"
+#include "kudu/collector/nodes_checker.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/zlib.h"
+
+DEFINE_string(collector_attributes, "",
+ "Entity attributes to collect (semicolon-separated list of entity attribute "
+ "name and values). e.g. attr_name1:attr_val1,attr_val2;attr_name2:attr_val3");
+DEFINE_string(collector_cluster_level_metrics, "on_disk_size,on_disk_data_size",
+ "Metric names which should be merged and pushed to cluster level view "
+ "(comma-separated list of metric names)");
+DEFINE_bool(collector_ignore_hosttable_level_metrics, false,
+ "Whether to ignore to report host-table level metrics.");
+DEFINE_string(collector_metrics, "",
+ "Metrics to collect (comma-separated list of metric names)");
+DEFINE_string(collector_metrics_types_for_test, "",
+ "Only for test, used to initialize metric_types_by_entity_type_");
+DEFINE_bool(collector_request_merged_metrics, true,
+ "Whether to request merged metrics and exclude unmerged metrics from server");
+DEFINE_string(collector_table_names, "",
+ "Table names to collect (comma-separated list of table names)");
+
+DECLARE_string(collector_cluster_name);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+DECLARE_int32(collector_warn_threshold_ms);
+
+using rapidjson::Value;
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::vector;
+using std::unordered_map;
+using strings::Substitute;
+
+namespace kudu {
+namespace collector {
+
+const set<string> MetricsCollector::kRegisterPercentiles = {"percentile_99"};
+
+MetricsCollector::MetricsCollector(scoped_refptr<NodesChecker> nodes_checker,
+ scoped_refptr<ReporterBase> reporter)
+ : initialized_(false),
+ nodes_checker_(std::move(nodes_checker)),
+ reporter_(std::move(reporter)),
+ stop_background_threads_latch_(1) {
+}
+
+MetricsCollector::~MetricsCollector() {
+ Shutdown();
+}
+
+Status MetricsCollector::Init() {
+ CHECK(!initialized_);
+
+ RETURN_NOT_OK(ValidateTableFilter(FLAGS_collector_attributes, FLAGS_collector_table_names));
+ RETURN_NOT_OK(InitMetrics());
+ RETURN_NOT_OK(InitFilters());
+ CHECK(attributes_filter_.empty()); // TODO(yingchun) disable now
+ RETURN_NOT_OK(InitMetricsUrlParameters());
+ RETURN_NOT_OK(InitClusterLevelMetrics());
+
+ initialized_ = true;
+ return Status::OK();
+}
+
+Status MetricsCollector::Start() {
+ CHECK(initialized_);
+
+ RETURN_NOT_OK(StartMetricCollectorThread());
+
+ return Status::OK();
+}
+
+void MetricsCollector::Shutdown() {
+ if (initialized_) {
+ string name = ToString();
+ LOG(INFO) << name << " shutting down...";
+
+ stop_background_threads_latch_.CountDown();
+
+ if (metric_collector_thread_) {
+ metric_collector_thread_->Join();
+ }
+
+ LOG(INFO) << name << " shutdown complete.";
+ }
+}
+
+string MetricsCollector::ToString() const {
+ return "MetricsCollector";
+}
+
+Status MetricsCollector::StartMetricCollectorThread() {
+ return Thread::Create("server", "metric-collector", &MetricsCollector::MetricCollectorThread,
+ this, &metric_collector_thread_);
+}
+
+void MetricsCollector::MetricCollectorThread() {
+ MonoTime collect_time;
+ do {
+ collect_time = MonoTime::Now();
+ WARN_NOT_OK(CollectAndReportMetrics(), "Unable to collect metrics");
+ collect_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec);
+ } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(collect_time));
+ LOG(INFO) << "MetricCollectorThread exit";
+}
+
+Status MetricsCollector::UpdateThreadPool(int32_t thread_count) {
+ if (host_metric_collector_thread_pool_ &&
+ host_metric_collector_thread_pool_->num_threads() == thread_count) {
+ return Status::OK();
+ }
+
+ if (host_metric_collector_thread_pool_) {
+ host_metric_collector_thread_pool_->Shutdown();
+ }
+ TRACE("Old thread pool shutdown");
+
+ RETURN_NOT_OK(ThreadPoolBuilder("host-metric-collector")
+ .set_min_threads(thread_count)
+ .set_max_threads(thread_count)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(1))
+ .Build(&host_metric_collector_thread_pool_));
+ TRACE("New thread pool built");
+
+ return Status::OK();
+}
+
+Status MetricsCollector::ValidateTableFilter(const string& attribute_filter,
+ const string& /*table_filter*/) {
+ if (attribute_filter.empty()) {
+ return Status::OK();
+ }
+
+ return Status::InvalidArgument("attribute filter is not supported now");
+}
+
+Status MetricsCollector::InitMetrics() {
+ string resp;
+ if (PREDICT_TRUE(FLAGS_collector_metrics_types_for_test.empty())) {
+ RETURN_NOT_OK(GetMetrics(
+ nodes_checker_->GetFirstMaster() + "/metrics?include_schema=1", &resp));
+ } else {
+ resp = FLAGS_collector_metrics_types_for_test;
+ }
+ JsonReader r(resp);
+ RETURN_NOT_OK(r.Init());
+ vector<const Value*> entities;
+ RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &entities));
+
+ map<string, MetricTypes> metric_types_by_entity_type;
+ bool tablet_entity_inited = false;
+ bool server_entity_inited = false;
+ for (const Value* entity : entities) {
+ string entity_type;
+ CHECK_OK(r.ExtractString(entity, "type", &entity_type));
+ if (entity_type == "tablet") {
+ if (tablet_entity_inited) continue;
+ EmplaceOrDie(&metric_types_by_entity_type, std::make_pair("tablet", MetricTypes()));
+ auto& tablet_metric_types = FindOrDie(metric_types_by_entity_type, "tablet");
+ ExtractMetricTypes(r, entity, &tablet_metric_types);
+ tablet_entity_inited = true;
+ } else if (entity_type == "server") {
+ if (server_entity_inited) continue;
+ EmplaceOrDie(&metric_types_by_entity_type, std::make_pair("server", MetricTypes()));
+ auto& server_metric_types = FindOrDie(metric_types_by_entity_type, "server");
+ ExtractMetricTypes(r, entity, &server_metric_types);
+ server_entity_inited = true;
+ } else {
+ LOG(WARNING) << "unhandled entity type " << entity_type;
+ }
+ }
+ metric_types_by_entity_type_.swap(metric_types_by_entity_type);
+ return Status::OK();
+}
+
+Status MetricsCollector::ExtractMetricTypes(const JsonReader& r,
+ const Value* entity,
+ MetricTypes* metric_types) {
+ CHECK(metric_types);
+ vector<const Value*> metrics;
+ RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
+ for (const Value* metric : metrics) {
+ string name;
+ RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
+ string type;
+ RETURN_NOT_OK(r.ExtractString(metric, "type", &type));
+ string upper_type;
+ ToUpperCase(type, &upper_type);
+ EmplaceOrDie(metric_types, std::make_pair(name, upper_type));
+ }
+ return Status::OK();
+}
+
+Status MetricsCollector::InitFilters() {
+ unordered_map<string, set<string>> attributes_filter;
+ vector<string> attribute_values_by_name_list =
+ Split(FLAGS_collector_attributes, ";", strings::SkipEmpty());
+ for (const auto& attribute_values_by_name : attribute_values_by_name_list) {
+ vector<string> attribute_name_and_values =
+ Split(attribute_values_by_name, ":", strings::SkipEmpty());
+ CHECK_EQ(attribute_name_and_values.size(), 2);
+ set<string> values(Split(attribute_name_and_values[1], ",", strings::SkipEmpty()));
+ CHECK(!values.empty());
+ EmplaceOrDie(&attributes_filter, std::make_pair(attribute_name_and_values[0], values));
+ }
+ attributes_filter_.swap(attributes_filter);
+ return Status::OK();
+}
+
+Status MetricsCollector::InitMetricsUrlParameters() {
+ metric_url_parameters_ = "/metrics?compact=1";
+ if (!FLAGS_collector_metrics.empty()) {
+ metric_url_parameters_ += "&metrics=" + FLAGS_collector_metrics;
+ }
+ if (FLAGS_collector_request_merged_metrics) {
+ metric_url_parameters_ += "&origin=false&merge=true";
+ } else {
+ LOG(FATAL) << "Non-merge mode is not supported now, you should set "
+ "FLAGS_collector_request_merged_metrics to true if you "
+ "want collector work well";
+ }
+
+ // TODO(yingchun) This is supported since version 1.10
+ if (!attributes_filter_.empty()) {
+ metric_url_parameters_ += "&attributes=";
+ }
+ for (const auto& attribute_filter : attributes_filter_) {
+ for (const auto& value : attribute_filter.second) {
+ metric_url_parameters_ += Substitute("$0,$1,", attribute_filter.first, value);
+ }
+ }
+ // TODO(yingchun) This is supported since internal version 1.8.0
+ if (!FLAGS_collector_table_names.empty()) {
+ metric_url_parameters_ += "&table_names=" + FLAGS_collector_table_names;
+ }
+ return Status::OK();
+}
+
+Status MetricsCollector::InitClusterLevelMetrics() {
+ Metrics cluster_metrics;
+ vector<string> metric_names =
+ Split(FLAGS_collector_cluster_level_metrics, ",", strings::SkipEmpty());
+ for (const auto& metric_name : metric_names) {
+ cluster_metrics[metric_name] = 0;
+ }
+ cluster_metrics_.swap(cluster_metrics);
+ return Status::OK();
+}
+
+Status MetricsCollector::CollectAndReportMetrics() {
+ LOG(INFO) << "Start to CollectAndReportMetrics";
+ MonoTime start(MonoTime::Now());
+ scoped_refptr<Trace> trace(new Trace);
+ ADOPT_TRACE(trace.get());
+ TRACE_EVENT0("collector", "MetricsCollector::CollectAndReportMetrics");
+ TRACE("init");
+ vector<string> tserver_http_addrs = nodes_checker_->GetNodes();
+ TRACE("Nodes got");
+ if (tserver_http_addrs.empty()) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(UpdateThreadPool(static_cast<int32_t>(tserver_http_addrs.size())));
+ vector<TablesMetrics> hosts_metrics_by_table_name(tserver_http_addrs.size());
+ vector<TablesHistMetrics> hosts_hist_metrics_by_table_name(tserver_http_addrs.size());
+ for (int i = 0; i < tserver_http_addrs.size(); ++i) {
+ RETURN_NOT_OK(host_metric_collector_thread_pool_->SubmitFunc(
+ std::bind(&MetricsCollector::CollectAndReportHostLevelMetrics,
+ this,
+ tserver_http_addrs[i] + metric_url_parameters_,
+ &hosts_metrics_by_table_name[i],
+ &hosts_hist_metrics_by_table_name[i])));
+ }
+ TRACE("Thead pool jobs submitted");
+ host_metric_collector_thread_pool_->Wait();
+ TRACE("Thead pool jobs done");
+
+ // Merge to table level metrics.
+ TablesMetrics metrics_by_table_name;
+ TablesHistMetrics hist_metrics_by_table_name;
+ RETURN_NOT_OK(MergeToTableLevelMetrics(hosts_metrics_by_table_name,
+ hosts_hist_metrics_by_table_name,
+ &metrics_by_table_name,
+ &hist_metrics_by_table_name));
+
+ // Merge to cluster level metrics.
+ Metrics cluster_metrics(cluster_metrics_);
+ RETURN_NOT_OK(MergeToClusterLevelMetrics(metrics_by_table_name,
+ hist_metrics_by_table_name,
+ &cluster_metrics));
+
+ auto timestamp = static_cast<uint64_t>(WallTime_Now());
+
+ // Push table level metrics.
+ RETURN_NOT_OK(ReportTableLevelMetrics(timestamp,
+ metrics_by_table_name,
+ hist_metrics_by_table_name));
+
+ // Push cluster level metrics.
+ RETURN_NOT_OK(ReportClusterLevelMetrics(timestamp, cluster_metrics));
+
+ int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+ if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+ if (Trace::CurrentTrace()) {
+ LOG(WARNING) << "Trace:" << std::endl
+ << Trace::CurrentTrace()->DumpToString();
+ }
+ }
+
+ return Status::OK();
+}
+
+Status MetricsCollector::MergeToTableLevelMetrics(
+ const vector<TablesMetrics>& hosts_metrics_by_table_name,
+ const vector<TablesHistMetrics>& hosts_hist_metrics_by_table_name,
+ TablesMetrics* metrics_by_table_name,
+ TablesHistMetrics* hist_metrics_by_table_name) {
+ CHECK(metrics_by_table_name);
+ CHECK(hist_metrics_by_table_name);
+
+ // GAUGE/COUNTER type metrics.
+ int metrics_count = 0;
+ for (const auto& host_metrics_by_table_name : hosts_metrics_by_table_name) {
+ for (const auto& table_metrics1 : host_metrics_by_table_name) {
+ const auto& table_name = table_metrics1.first;
+ const auto& metrics = table_metrics1.second;
+ metrics_count += metrics.size();
+ if (EmplaceIfNotPresent(metrics_by_table_name, std::make_pair(table_name, metrics))) {
+ continue;
+ }
+ // This table has been fetched by some other tserver.
+ auto& table_metrics = FindOrDie(*metrics_by_table_name, table_name);
+ for (const auto& metric_value : metrics) {
+ const auto& metric = metric_value.first;
+ const auto& value = metric_value.second;
+ if (EmplaceIfNotPresent(&table_metrics, std::make_pair(metric, value))) {
+ continue;
+ }
+ // This metric has been fetched by some other tserver.
+ auto& old_value = FindOrDie(table_metrics, metric);
+ old_value += value;
+ }
+ }
+ }
+ TRACE(Substitute("Table GAUGE/COUNTER type metrics merged, count $0", metrics_count));
+
+ // HISTOGRAM type metrics.
+ metrics_count = 0;
+ for (const auto& host_hist_metrics_by_table_name : hosts_hist_metrics_by_table_name) {
+ for (const auto& table_hist_metrics1 : host_hist_metrics_by_table_name) {
+ const auto& table_name = table_hist_metrics1.first;
+ const auto& metrics = table_hist_metrics1.second;
+ metrics_count += metrics.size();
+ if (EmplaceIfNotPresent(hist_metrics_by_table_name, std::make_pair(table_name, metrics))) {
+ continue;
+ }
+ // This table has been fetched by some other tserver.
+ auto& table_hist_metrics = FindOrDie(*hist_metrics_by_table_name, table_name);
+ for (const auto& metric_hist_values : metrics) {
+ const auto& metric = metric_hist_values.first;
+ const auto& hist_values = metric_hist_values.second;
+ if (EmplaceIfNotPresent(&table_hist_metrics, std::make_pair(metric, hist_values))) {
+ continue;
+ }
+ // This metric has been fetched by some other tserver.
+ auto& old_hist_values = FindOrDie(table_hist_metrics, metric);
+ for (auto& hist_value : hist_values) {
+ old_hist_values.emplace_back(hist_value);
+ }
+ }
+ }
+ }
+ TRACE(Substitute("Table HISTOGRAM type metrics merged, count $0", metrics_count));
+
+ return Status::OK();
+}
+
+Status MetricsCollector::MergeToClusterLevelMetrics(
+ const TablesMetrics& metrics_by_table_name,
+ const TablesHistMetrics& /*hist_metrics_by_table_name*/,
+ Metrics* cluster_metrics) {
+ CHECK(cluster_metrics);
+ if (!cluster_metrics->empty()) {
+ for (const auto& table_metrics : metrics_by_table_name) {
+ for (auto& cluster_metric : *cluster_metrics) {
+ auto *find = FindOrNull(table_metrics.second, cluster_metric.first);
+ if (find) {
+ cluster_metric.second += *find;
+ }
+ }
+ }
+ }
+ TRACE(Substitute("Cluster metrics merged, count $0", cluster_metrics->size()));
+
+ return Status::OK();
+}
+
+Status MetricsCollector::GetNumberMetricValue(const rapidjson::Value* metric,
+ const string& metric_name /*metric_name*/,
+ int64_t* result) const {
+ CHECK(result);
+ if (metric->IsUint64() || metric->IsInt64() || metric->IsUint() || metric->IsInt()) {
+ *result = metric->GetInt64();
+ return Status::OK();
+ }
+
+ if (metric->IsDouble()) {
+ double result_temp = metric->GetDouble();
+ // Multiply by 1000000 and convert to int64_t to avoid much data loss and keep compatibility
+ // with monitor system like Falcon.
+ *result = static_cast<int64_t>(result_temp * 1000000);
+ return Status::OK();
+ }
+
+ return Status::NotSupported(Substitute("unsupported metric $0", metric_name));
+}
+
+Status MetricsCollector::GetStringMetricValue(const Value* metric,
+ const string& metric_name,
+ int64_t* result) const {
+ CHECK(result);
+ string value(metric->GetString());
+ if (metric_name == "state") {
+ return ConvertStateToInt(value, result);
+ }
+ return Status::NotSupported(Substitute("unsupported metric $0", metric_name));
+}
+
+Status MetricsCollector::ConvertStateToInt(const string& value, int64_t* result) {
+ CHECK(result);
+ // TODO(yingchun) Here, table state is merged by several original tablet states, which is
+ // contacted by several sub-strings, like 'RUNNING', 'BOOTSTRAPPING', etc. It's tricky to
+ // fetch state now, we will improve in server side later.
+ const char* running = "RUNNING";
+ if (value.empty() || value.size() % strlen(running) != 0) {
+ *result = 0;
+ return Status::OK();
+ }
+ for (int i = 0; i < value.size(); i += strlen(running)) {
+ if (0 != strncmp(running, value.c_str() + i, strlen(running))) {
+ *result = 0;
+ return Status::OK();
+ }
+ }
+ *result = 1;
+ return Status::OK();
+}
+
+bool MetricsCollector::FilterByAttribute(const JsonReader& r,
+ const rapidjson::Value* entity) const {
+ if (attributes_filter_.empty()) {
+ return false;
+ }
+ const Value* attributes;
+ CHECK_OK(r.ExtractObject(entity, "attributes", &attributes));
+ for (const auto& name_values : attributes_filter_) {
+ string value;
+ Status s = r.ExtractString(attributes, name_values.first.c_str(), &value);
+ if (s.ok() && ContainsKey(name_values.second, value)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+Status MetricsCollector::ParseServerMetrics(const JsonReader& /*r*/,
+ const rapidjson::Value* /*entity*/) {
+ return Status::NotSupported("server entity is not supported");
+}
+
+Status MetricsCollector::ParseTableMetrics(const JsonReader& r,
+ const rapidjson::Value* entity,
+ TablesMetrics* metrics_by_table_name,
+ Metrics* host_metrics,
+ TablesHistMetrics* hist_metrics_by_table_name,
+ HistMetrics* host_hist_metrics) const {
+ CHECK(metrics_by_table_name);
+ CHECK(host_metrics);
+ CHECK(hist_metrics_by_table_name);
+ CHECK(host_hist_metrics);
+
+ string table_name;
+ CHECK_OK(r.ExtractString(entity, "id", &table_name));
+ CHECK(!ContainsKey(*metrics_by_table_name, table_name));
+ CHECK(!ContainsKey(*hist_metrics_by_table_name, table_name));
+
+ EmplaceOrDie(metrics_by_table_name, std::make_pair(table_name, Metrics()));
+ auto& table_metrics = FindOrDie(*metrics_by_table_name, table_name);
+
+ EmplaceOrDie(hist_metrics_by_table_name, std::make_pair(table_name, HistMetrics()));
+ auto& table_hist_metrics = FindOrDie(*hist_metrics_by_table_name, table_name);
+
+ vector<const Value*> metrics;
+ CHECK_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
+ for (const Value* metric : metrics) {
+ string name;
+ CHECK_OK(r.ExtractString(metric, "name", &name));
+ const auto* tablet_metric_types = FindOrNull(metric_types_by_entity_type_, "tablet");
+ CHECK(tablet_metric_types);
+ const auto* known_type = FindOrNull(*tablet_metric_types, name);
+ if (!known_type) {
+ LOG(ERROR) << Substitute("metric $0 has unknown type, ignore it", name);
+ continue;
+ }
+
+ if (*known_type == "GAUGE" || *known_type == "COUNTER") {
+ int64_t value = 0;
+ const Value* val;
+ RETURN_NOT_OK(r.ExtractField(metric, "value", &val));
+ rapidjson::Type type = val->GetType();
+ switch (type) {
+ case rapidjson::Type::kStringType:
+ CHECK_OK(GetStringMetricValue(val, name, &value));
+ break;
+ case rapidjson::Type::kNumberType:
+ CHECK_OK(GetNumberMetricValue(val, name, &value));
+ break;
+ default:
+ LOG(FATAL) << "Unknown type, metrics name: " << name;
+ }
+
+ EmplaceOrDie(&table_metrics, std::make_pair(name, value));
+ if (!EmplaceIfNotPresent(host_metrics, std::make_pair(name, value))) {
+ auto& host_metric = FindOrDie(*host_metrics, name);
+ host_metric += value;
+ }
+ } else if (*known_type == "HISTOGRAM") {
+ for (const auto& percentile : kRegisterPercentiles) {
+ string hist_metric_name(name);
+ hist_metric_name += "_" + percentile;
+ int64_t total_count;
+ CHECK_OK(r.ExtractInt64(metric, "total_count", &total_count));
+ int64_t percentile_value;
+ CHECK_OK(r.ExtractInt64(metric, percentile.c_str(), &percentile_value));
+ vector<SimpleHistogram> tmp({{total_count, percentile_value}});
+ EmplaceOrDie(&table_hist_metrics, std::make_pair(hist_metric_name, tmp));
+ if (!EmplaceIfNotPresent(host_hist_metrics, std::make_pair(hist_metric_name, tmp))) {
+ auto& host_hist_metric = FindOrDie(*host_hist_metrics, hist_metric_name);
+ host_hist_metric.emplace_back(tmp[0]);
+ }
+ }
+ } else {
+ LOG(FATAL) << "Unknown metric type: " << *known_type;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status MetricsCollector::ParseTabletMetrics(const JsonReader& /*r*/,
+ const rapidjson::Value* /*entity*/) {
+ return Status::NotSupported("tablet entity is not supported");
+}
+
+Status MetricsCollector::CollectAndReportHostLevelMetrics(
+ const string& url,
+ TablesMetrics* metrics_by_table_name,
+ TablesHistMetrics* hist_metrics_by_table_name) {
+ MonoTime start(MonoTime::Now());
+ scoped_refptr<Trace> trace(new Trace);
+ ADOPT_TRACE(trace.get());
+ TRACE_EVENT1("collector", "MetricsCollector::CollectAndReportHostLevelMetrics",
+ "url", url);
+ TRACE("init");
+ CHECK(metrics_by_table_name);
+ CHECK(hist_metrics_by_table_name);
+
+ // Get metrics from server.
+ string resp;
+ RETURN_NOT_OK(GetMetrics(url, &resp));
+
+ // Merge metrics by table and metric type.
+ Metrics host_metrics;
+ HistMetrics host_hist_metrics;
+ RETURN_NOT_OK(ParseMetrics(resp, metrics_by_table_name, &host_metrics,
+ hist_metrics_by_table_name, &host_hist_metrics));
+
+ string host_name = ExtractHostName(url);
+ auto timestamp = static_cast<uint64_t>(WallTime_Now());
+
+ // Host table level.
+ if (!FLAGS_collector_ignore_hosttable_level_metrics) {
+ RETURN_NOT_OK(ReportHostTableLevelMetrics(host_name, timestamp,
+ *metrics_by_table_name,
+ *hist_metrics_by_table_name));
+ }
+
+ // Host level.
+ RETURN_NOT_OK(ReportHostLevelMetrics(host_name, timestamp,
+ host_metrics,
+ host_hist_metrics));
+
+ int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+ if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+ if (Trace::CurrentTrace()) {
+ LOG(WARNING) << "Trace:" << std::endl
+ << Trace::CurrentTrace()->DumpToString();
+ }
+ }
+ return Status::OK();
+}
+
+Status MetricsCollector::ParseMetrics(const string& data,
+ TablesMetrics* metrics_by_table_name,
+ Metrics* host_metrics,
+ TablesHistMetrics* hist_metrics_by_table_name,
+ HistMetrics* host_hist_metrics) {
+ JsonReader r(data);
+ RETURN_NOT_OK(r.Init());
+ vector<const Value*> entities;
+ RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &entities));
+
+ for (const Value* entity : entities) {
+ if (FilterByAttribute(r, entity)) {
+ continue;
+ }
+ string entity_type;
+ CHECK_OK(r.ExtractString(entity, "type", &entity_type));
+ if (entity_type == "server") {
+ CHECK(ParseServerMetrics(r, entity).IsNotSupported());
+ } else if (entity_type == "table") {
+ CHECK_OK(ParseTableMetrics(r, entity,
+ metrics_by_table_name, host_metrics,
+ hist_metrics_by_table_name, host_hist_metrics));
+ } else if (entity_type == "tablet") {
+ CHECK(ParseTabletMetrics(r, entity).IsNotSupported());
+ } else {
+ LOG(FATAL) << "Unknown entity_type: " << entity_type;
+ }
+ }
+ TRACE(Substitute("Metrics parsed, entity count $0", entities.size()));
+
+ return Status::OK();
+}
+
+void MetricsCollector::CollectMetrics(const string& endpoint,
+ const Metrics& metrics,
+ const std::string& level,
+ uint64_t timestamp,
+ const std::string& extra_tags,
+ list<scoped_refptr<ItemBase>>* items) {
+ for (const auto& metric : metrics) {
+ items->emplace_back(
+ reporter_->ConstructItem(endpoint,
+ metric.first,
+ level,
+ timestamp,
+ metric.second,
+ FindOrDie(metric_types_by_entity_type_["tablet"], metric.first),
+ extra_tags));
+ }
+}
+
+void MetricsCollector::CollectMetrics(const string& endpoint,
+ const HistMetrics& metrics,
+ const string& level,
+ uint64_t timestamp,
+ const string& extra_tags,
+ list<scoped_refptr<ItemBase>>* items) {
+ for (const auto& metric : metrics) {
+ items->emplace_back(
+ reporter_->ConstructItem(endpoint,
+ metric.first,
+ level,
+ timestamp,
+ GetHistValue(metric.second),
+ "GAUGE",
+ extra_tags));
+ }
+}
+
+Status MetricsCollector::ReportHostTableLevelMetrics(
+ const string& host_name,
+ uint64_t timestamp,
+ const TablesMetrics& metrics_by_table_name,
+ const TablesHistMetrics& hist_metrics_by_table_name) {
+ list<scoped_refptr<ItemBase>> items;
+ // GAUGE/COUNTER type metrics.
+ int metrics_count = 0;
+ for (const auto& table_metrics : metrics_by_table_name) {
+ const auto extra_tag = Substitute("table=$0", table_metrics.first);
+ metrics_count += table_metrics.second.size();
+ CollectMetrics(host_name, table_metrics.second, "host_table", timestamp, extra_tag, &items);
+ }
+ TRACE(Substitute("Host-table GAUGE/COUNTER type metrics collected, count $0", metrics_count));
+
+ // HISTOGRAM type metrics.
+ int hist_metrics_count = 0;
+ for (const auto& table_hist_metrics : hist_metrics_by_table_name) {
+ const auto extra_tag = Substitute("table=$0", table_hist_metrics.first);
+ hist_metrics_count += table_hist_metrics.second.size();
+ CollectMetrics(host_name, table_hist_metrics.second,
+ "host_table", timestamp, extra_tag,
+ &items);
+ }
+ TRACE(Substitute("Host-table HISTOGRAM type metrics collected, count $0", hist_metrics_count));
+
+ reporter_->PushItems(std::move(items));
+ TRACE(Substitute("Host-table metrics reported, count $0", metrics_count + hist_metrics_count));
+
+ return Status::OK();
+}
+
+Status MetricsCollector::ReportHostLevelMetrics(
+ const string& host_name,
+ uint64_t timestamp,
+ const Metrics& host_metrics,
+ const HistMetrics& host_hist_metrics) {
+ list<scoped_refptr<ItemBase>> items;
+ // GAUGE/COUNTER type metrics.
+ CollectMetrics(host_name, host_metrics, "host", timestamp, "", &items);
+ TRACE(Substitute("Host GAUGE/COUNTER type metrics collected, count $0", host_metrics.size()));
+
+ // HISTOGRAM type metrics.
+ CollectMetrics(host_name, host_hist_metrics, "host", timestamp, "", &items);
+ TRACE(Substitute("Host HISTOGRAM type metrics collected, count $0", host_hist_metrics.size()));
+
+ reporter_->PushItems(std::move(items));
+ TRACE(Substitute("Host metrics reported, count $0",
+ host_metrics.size() + host_hist_metrics.size()));
+
+ return Status::OK();
+}
+
+Status MetricsCollector::ReportTableLevelMetrics(
+ uint64_t timestamp,
+ const TablesMetrics& metrics_by_table_name,
+ const TablesHistMetrics& hist_metrics_by_table_name) {
+ list<scoped_refptr<ItemBase>> items;
+ // GAUGE/COUNTER type metrics.
+ int metrics_count = 0;
+ for (const auto& table_metrics : metrics_by_table_name) {
+ metrics_count += table_metrics.second.size();
+ CollectMetrics(table_metrics.first,
+ table_metrics.second,
+ "table", timestamp, "", &items);
+ }
+ TRACE(Substitute("Table GAUGE/COUNTER type metrics collected, count $0", metrics_count));
+
+ // HISTOGRAM type metrics.
+ int hist_metrics_count = 0;
+ for (const auto& table_hist_metrics : hist_metrics_by_table_name) {
+ hist_metrics_count += table_hist_metrics.second.size();
+ CollectMetrics(table_hist_metrics.first,
+ table_hist_metrics.second,
+ "table", timestamp, "", &items);
+ }
+ TRACE(Substitute("Table HISTOGRAM type metrics collected, count $0", hist_metrics_count));
+
+ reporter_->PushItems(std::move(items));
+ TRACE(Substitute("Table metrics reported, count $0", metrics_count + hist_metrics_count));
+
+ return Status::OK();
+}
+
+Status MetricsCollector::ReportClusterLevelMetrics(uint64_t timestamp,
+ const Metrics& cluster_metrics) {
+ list<scoped_refptr<ItemBase>> items;
+ CollectMetrics(FLAGS_collector_cluster_name, cluster_metrics, "cluster", timestamp, "", &items);
+ TRACE(Substitute("Cluster metrics collected, count $0", cluster_metrics.size()));
+
+ reporter_->PushItems(std::move(items));
+ TRACE(Substitute("Cluster metrics reported, count $0", cluster_metrics.size()));
+
+ return Status::OK();
+}
+
+int64_t MetricsCollector::GetHistValue(const vector<SimpleHistogram>& hist_values) {
+ int64_t total_count = 0;
+ double total_value = 0.0;
+ for (const auto& hist_value : hist_values) {
+ total_count += hist_value.count;
+ total_value += hist_value.count * hist_value.value;
+ }
+ int64_t value = 0;
+ if (total_count != 0) {
+ value = std::llround(total_value / total_count);
+ }
+ return value;
+}
+
+Status MetricsCollector::GetMetrics(const string& url, string* resp) {
+ CHECK(resp);
+ EasyCurl curl;
+ faststring dst;
+ //curl.set_return_headers(true);
+ RETURN_NOT_OK(curl.FetchURL(url, &dst, {"Accept-Encoding: gzip"}));
+ std::ostringstream oss;
+ string dst_str = dst.ToString();
+ if (zlib::Uncompress(Slice(dst_str), &oss).ok()) {
+ *resp = oss.str();
+ } else {
+ *resp = dst_str;
+ }
+ TRACE(Substitute("Metrics got from server: $0", url));
+
+ return Status::OK();
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/metrics_collector.h b/src/kudu/collector/metrics_collector.h
new file mode 100644
index 0000000..1435e52
--- /dev/null
+++ b/src/kudu/collector/metrics_collector.h
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class JsonReader;
+class Thread;
+class ThreadPool;
+
+namespace collector {
+struct ItemBase;
+} // namespace collector
+} // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class NodesChecker;
+class ReporterBase;
+
+class MetricsCollector : public RefCounted<MetricsCollector> {
+ public:
+ MetricsCollector(scoped_refptr<NodesChecker> nodes_checker,
+ scoped_refptr<ReporterBase> reporter);
+ ~MetricsCollector();
+
+ Status Init();
+ Status Start();
+ void Shutdown();
+
+ std::string ToString() const;
+
+ private:
+ friend class RefCounted<MetricsCollector>;
+
+ FRIEND_TEST(TestMetricsCollector, TestConvertStateToInt);
+ FRIEND_TEST(TestMetricsCollector, TestGetHistValue);
+ FRIEND_TEST(TestMetricsCollector, TestMergeToTableLevelMetrics);
+ FRIEND_TEST(TestMetricsCollector, TestMergeToClusterLevelMetrics);
+ FRIEND_TEST(TestMetricsCollector, TestParseMetrics);
+ FRIEND_TEST(TestMetricsCollector, TestInitMetrics);
+ FRIEND_TEST(TestMetricsCollector, TestInitFilters);
+ FRIEND_TEST(TestMetricsCollector, TestInitMetricsUrlParameters);
+ FRIEND_TEST(TestMetricsCollector, TestInitClusterLevelMetrics);
+
+ typedef std::unordered_map<std::string, int64_t> Metrics;
+ typedef std::unordered_map<std::string, Metrics> TablesMetrics;
+ struct SimpleHistogram {
+ int64_t count;
+ int64_t value;
+ SimpleHistogram(int64_t c, int64_t v) : count(c), value(v) {
+ }
+ inline bool operator==(const SimpleHistogram& rhs) const {
+ return count == rhs.count && value == rhs.value;
+ }
+ };
+
+ typedef std::unordered_map<std::string, std::vector<SimpleHistogram>> HistMetrics;
+ typedef std::unordered_map<std::string, HistMetrics> TablesHistMetrics;
+
+ typedef std::unordered_map<std::string, std::string> MetricTypes;
+
+ Status ValidateTableFilter(const std::string& attribute_filter, const std::string& table_filter);
+ Status InitMetrics();
+ static Status ExtractMetricTypes(const JsonReader& r,
+ const rapidjson::Value* entity,
+ MetricTypes* metric_types);
+ Status InitFilters();
+ Status InitMetricsUrlParameters();
+ Status InitClusterLevelMetrics();
+
+ Status StartMetricCollectorThread();
+ void MetricCollectorThread();
+ Status CollectAndReportMetrics();
+
+ Status UpdateThreadPool(int32_t thread_count);
+
+ Status CollectAndReportHostLevelMetrics(const std::string& url,
+ TablesMetrics* metrics_by_table_name,
+ TablesHistMetrics* hist_metrics_by_table_name);
+
+ static Status MergeToTableLevelMetrics(
+ const std::vector<TablesMetrics>& hosts_metrics_by_table_name,
+ const std::vector<TablesHistMetrics>& hosts_hist_metrics_by_table_name,
+ TablesMetrics* metrics_by_table_name,
+ TablesHistMetrics* hist_metrics_by_table_name);
+ static Status MergeToClusterLevelMetrics(const TablesMetrics& metrics_by_table_name,
+ const TablesHistMetrics& hist_metrics_by_table_name,
+ Metrics* cluster_metrics);
+
+ // Report metrics to third-party monitor system.
+ void CollectMetrics(const std::string& endpoint,
+ const Metrics& metrics,
+ const std::string& level,
+ uint64_t timestamp,
+ const std::string& extra_tags,
+ std::list<scoped_refptr<ItemBase>>* items);
+ void CollectMetrics(const std::string& endpoint,
+ const HistMetrics& metrics,
+ const std::string& level,
+ uint64_t timestamp,
+ const std::string& extra_tags,
+ std::list<scoped_refptr<ItemBase>>* items);
+
+ Status ReportHostTableLevelMetrics(const std::string& host_name,
+ uint64_t timestamp,
+ const TablesMetrics& metrics_by_table_name,
+ const TablesHistMetrics& hist_metrics_by_table_name);
+ Status ReportHostLevelMetrics(const std::string& host_name,
+ uint64_t timestamp,
+ const Metrics& host_metrics,
+ const HistMetrics& host_hist_metrics);
+ Status ReportTableLevelMetrics(uint64_t timestamp,
+ const TablesMetrics& metrics_by_table_name,
+ const TablesHistMetrics& hist_metrics_by_table_name);
+ Status ReportClusterLevelMetrics(uint64_t timestamp,
+ const Metrics& cluster_metrics);
+ static int64_t GetHistValue(const std::vector<SimpleHistogram>& hist_values);
+
+ // Get metrics from server by http method.
+ static Status GetMetrics(const std::string& url, std::string* resp);
+
+ // Parse metrics from http response, entities may be in different types.
+ Status ParseMetrics(const std::string& data,
+ TablesMetrics* metrics_by_table_name,
+ Metrics* host_metrics,
+ TablesHistMetrics* hist_metrics_by_table_name,
+ HistMetrics* host_hist_metrics);
+ static Status ParseServerMetrics(const JsonReader& r,
+ const rapidjson::Value* entity);
+ Status ParseTableMetrics(const JsonReader& r,
+ const rapidjson::Value* entity,
+ TablesMetrics* metrics_by_table_name,
+ Metrics* host_metrics,
+ TablesHistMetrics* hist_metrics_by_table_name,
+ HistMetrics* host_hist_metrics) const;
+ static Status ParseTabletMetrics(const JsonReader& r,
+ const rapidjson::Value* entity);
+
+ // Return true when this entity could be filtered.
+ // When server side support attributes filter, this function has no effect.
+ bool FilterByAttribute(const JsonReader& r,
+ const rapidjson::Value* entity) const;
+ Status GetNumberMetricValue(const rapidjson::Value* metric,
+ const std::string& metric_name,
+ int64_t* result) const;
+ Status GetStringMetricValue(const rapidjson::Value* metric,
+ const std::string& metric_name,
+ int64_t* result) const;
+ static Status ConvertStateToInt(const std::string& value, int64_t* result);
+
+ static const std::set<std::string> kRegisterPercentiles;
+
+ bool initialized_;
+
+ scoped_refptr<NodesChecker> nodes_checker_;
+ scoped_refptr<ReporterBase> reporter_;
+
+ std::map<std::string, MetricTypes> metric_types_by_entity_type_;
+ // Attribute filter, attributes not in this map will be filtered if it's not empty.
+ // attribute name ---> attribute values
+ std::unordered_map<std::string, std::set<std::string>> attributes_filter_;
+ std::string metric_url_parameters_;
+ Metrics cluster_metrics_;
+
+ CountDownLatch stop_background_threads_latch_;
+ scoped_refptr<Thread> metric_collector_thread_;
+ std::unique_ptr<ThreadPool> host_metric_collector_thread_pool_;
+
+ DISALLOW_COPY_AND_ASSIGN(MetricsCollector);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/nodes_checker-test.cc b/src/kudu/collector/nodes_checker-test.cc
new file mode 100644
index 0000000..2390a52
--- /dev/null
+++ b/src/kudu/collector/nodes_checker-test.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/nodes_checker.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/rebalance/cluster_status.h"
+
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::HealthCheckResult;
+
+namespace kudu {
+namespace collector {
+
+TEST(TestNodesChecker, TestExtractServerHealthStatus) {
+ ASSERT_EQ(ServerHealth::HEALTHY,
+ NodesChecker::ExtractServerHealthStatus("HEALTHY"));
+ ASSERT_EQ(ServerHealth::UNAUTHORIZED,
+ NodesChecker::ExtractServerHealthStatus("UNAUTHORIZED"));
+ ASSERT_EQ(ServerHealth::UNAVAILABLE,
+ NodesChecker::ExtractServerHealthStatus("UNAVAILABLE"));
+ ASSERT_EQ(ServerHealth::WRONG_SERVER_UUID,
+ NodesChecker::ExtractServerHealthStatus("WRONG_SERVER_UUID"));
+}
+
+TEST(TestNodesChecker, TestExtractTableHealthStatus) {
+ ASSERT_EQ(HealthCheckResult::HEALTHY,
+ NodesChecker::ExtractTableHealthStatus("HEALTHY"));
+ ASSERT_EQ(HealthCheckResult::RECOVERING,
+ NodesChecker::ExtractTableHealthStatus("RECOVERING"));
+ ASSERT_EQ(HealthCheckResult::UNDER_REPLICATED,
+ NodesChecker::ExtractTableHealthStatus("UNDER_REPLICATED"));
+ ASSERT_EQ(HealthCheckResult::UNAVAILABLE,
+ NodesChecker::ExtractTableHealthStatus("UNAVAILABLE"));
+ ASSERT_EQ(HealthCheckResult::CONSENSUS_MISMATCH,
+ NodesChecker::ExtractTableHealthStatus("CONSENSUS_MISMATCH"));
+}
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/collector/nodes_checker.cc b/src/kudu/collector/nodes_checker.cc
new file mode 100644
index 0000000..8b64c29
--- /dev/null
+++ b/src/kudu/collector/nodes_checker.cc
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/collector/nodes_checker.h"
+
+#include <cstdint>
+#include <list>
+#include <mutex>
+#include <ostream>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <rapidjson/document.h>
+
+#include "kudu/collector/collector_util.h"
+#include "kudu/collector/reporter_base.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+DECLARE_string(collector_cluster_name);
+DECLARE_string(collector_master_addrs);
+DECLARE_int32(collector_interval_sec);
+DECLARE_int32(collector_timeout_sec);
+DECLARE_int32(collector_warn_threshold_ms);
+
+using rapidjson::Value;
+using std::list;
+using std::string;
+using std::vector;
+using strings::Substitute;
+using kudu::cluster_summary::ServerHealth;
+using kudu::cluster_summary::HealthCheckResult;
+
+namespace kudu {
+namespace collector {
+
+const std::string NodesChecker::kMaster = "master";
+const std::string NodesChecker::kTserver = "tserver";
+
+NodesChecker::NodesChecker(scoped_refptr<ReporterBase> reporter)
+ : initialized_(false),
+ reporter_(std::move(reporter)),
+ stop_background_threads_latch_(1) {
+}
+
+NodesChecker::~NodesChecker() {
+ Shutdown();
+}
+
+Status NodesChecker::Init() {
+ CHECK(!initialized_);
+
+ RETURN_NOT_OK(UpdateNodes());
+ CHECK(!master_http_addrs_.empty());
+
+ initialized_ = true;
+ return Status::OK();
+}
+
+Status NodesChecker::Start() {
+ CHECK(initialized_);
+
+ RETURN_NOT_OK(StartNodesCheckerThread());
+
+ return Status::OK();
+}
+
+void NodesChecker::Shutdown() {
+ if (initialized_) {
+ string name = ToString();
+ LOG(INFO) << name << " shutting down...";
+
+ stop_background_threads_latch_.CountDown();
+
+ if (nodes_checker_thread_) {
+ nodes_checker_thread_->Join();
+ }
+
+ LOG(INFO) << name << " shutdown complete.";
+ }
+}
+
+string NodesChecker::ToString() const {
+ return "NodesChecker";
+}
+
+vector<string> NodesChecker::GetNodes() {
+ shared_lock<RWMutex> l(nodes_lock_);
+ return tserver_http_addrs_;
+}
+
+string NodesChecker::GetFirstMaster() {
+ shared_lock<RWMutex> l(nodes_lock_);
+ CHECK(!master_http_addrs_.empty());
+ return master_http_addrs_[0];
+}
+
+Status NodesChecker::StartNodesCheckerThread() {
+ return Thread::Create("collector", "nodes-checker", &NodesChecker::NodesCheckerThread,
+ this, &nodes_checker_thread_);
+}
+
+void NodesChecker::NodesCheckerThread() {
+ MonoTime check_time;
+ do {
+ check_time = MonoTime::Now();
+ UpdateAndCheckNodes();
+ check_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec);
+ } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(check_time));
+ LOG(INFO) << "FalconPusherThread exit";
+}
+
+void NodesChecker::UpdateAndCheckNodes() {
+ LOG(INFO) << "Start to UpdateAndCheckNodes";
+ MonoTime start(MonoTime::Now());
+ scoped_refptr<Trace> trace(new Trace);
+ ADOPT_TRACE(trace.get());
+ TRACE_EVENT0("collector", "NodesChecker::UpdateAndCheckNodes");
+ WARN_NOT_OK(UpdateNodes(), "Unable to update nodes");
+ WARN_NOT_OK(CheckNodes(), "Unable to check nodes");
+ int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds();
+ if (elapsed_ms > FLAGS_collector_warn_threshold_ms) {
+ if (Trace::CurrentTrace()) {
+ LOG(WARNING) << "Trace:" << std::endl
+ << Trace::CurrentTrace()->DumpToString();
+ }
+ }
+}
+
+Status NodesChecker::UpdateNodes() {
+ RETURN_NOT_OK(UpdateServers(kMaster));
+ RETURN_NOT_OK(UpdateServers(kTserver));
+ return Status::OK();
+}
+
+Status NodesChecker::UpdateServers(const std::string& role) {
+ DCHECK(role == kTserver || role == kMaster);
+ vector<string> args = {
+ role,
+ "list",
+ FLAGS_collector_master_addrs,
+ "-columns=http-addresses",
+ "-format=json",
+ Substitute("-timeout_ms=$0", FLAGS_collector_timeout_sec*1000)
+ };
+ string tool_stdout;
+ string tool_stderr;
+ RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+ Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+ TRACE(Substitute("'$0 list' done", role));
+
+ JsonReader r(tool_stdout);
+ RETURN_NOT_OK(r.Init());
+ vector<const Value*> servers;
+ CHECK_OK(r.ExtractObjectArray(r.root(), nullptr, &servers));
+ vector<string> server_http_addrs;
+ for (const Value* server : servers) {
+ string http_address;
+ CHECK_OK(r.ExtractString(server, "http-addresses", &http_address));
+ server_http_addrs.emplace_back(http_address);
+ }
+ TRACE(Substitute("Result parsed, nodes count $0", server_http_addrs.size()));
+
+ if (role == kTserver) {
+ std::lock_guard<RWMutex> l(nodes_lock_);
+ tserver_http_addrs_.swap(server_http_addrs);
+ } else {
+ std::lock_guard<RWMutex> l(nodes_lock_);
+ master_http_addrs_.swap(server_http_addrs);
+ }
+ TRACE("Nodes updated");
+
+ return Status::OK();
+}
+
+Status NodesChecker::CheckNodes() const {
+ vector<string> args = {
+ "cluster",
+ "ksck",
+ FLAGS_collector_master_addrs,
+ "-consensus=false",
+ "-ksck_format=json_compact",
+ "-color=never",
+ "-sections=MASTER_SUMMARIES,TSERVER_SUMMARIES,TABLE_SUMMARIES,TOTAL_COUNT",
+ Substitute("-timeout_ms=$0", FLAGS_collector_timeout_sec*1000)
+ };
+ string tool_stdout;
+ string tool_stderr;
+ WARN_NOT_OK(tools::RunKuduTool(args, &tool_stdout, &tool_stderr),
+ Substitute("out: $0, err: $1", tool_stdout, tool_stderr));
+
+ TRACE("'cluster ksck' done");
+
+ RETURN_NOT_OK(ReportNodesMetrics(tool_stdout));
+ return Status::OK();
+}
+
+Status NodesChecker::ReportNodesMetrics(const string& data) const {
+ JsonReader r(data);
+ RETURN_NOT_OK(r.Init());
+ const Value* ksck;
+ CHECK_OK(r.ExtractObject(r.root(), nullptr, &ksck));
+ auto timestamp = static_cast<uint64_t>(WallTime_Now());
+
+ list<scoped_refptr<ItemBase>> items;
+ // Maters health info.
+ vector<const Value*> masters;
+ CHECK_OK(r.ExtractObjectArray(ksck, "master_summaries", &masters));
+ for (const Value* master : masters) {
+ string address;
+ CHECK_OK(r.ExtractString(master, "address", &address));
+ string health;
+ CHECK_OK(r.ExtractString(master, "health", &health));
+ items.emplace_back(reporter_->ConstructItem(
+ ExtractHostName(address),
+ "kudu-master-health",
+ "host",
+ timestamp,
+ static_cast<int64_t>(ExtractServerHealthStatus(health)),
+ "GAUGE",
+ ""));
+ }
+ TRACE(Substitute("Maters health info reported, count $0", masters.size()));
+
+ // Tservers health info.
+ vector<const Value*> tservers;
+ Status s = r.ExtractObjectArray(ksck, "tserver_summaries", &tservers);
+ CHECK(s.ok() || s.IsNotFound());
+ if (s.ok()) {
+ for (const Value* tserver : tservers) {
+ string address;
+ CHECK_OK(r.ExtractString(tserver, "address", &address));
+ string health;
+ CHECK_OK(r.ExtractString(tserver, "health", &health));
+ items.emplace_back(reporter_->ConstructItem(
+ ExtractHostName(address),
+ "kudu-tserver-health",
+ "host",
+ timestamp,
+ static_cast<int64_t>(ExtractServerHealthStatus(health)),
+ "GAUGE",
+ ""));
+ }
+ TRACE(Substitute("Tservers health info reported, count $0", tservers.size()));
+ }
+
+ // Tables health info.
+ uint32_t health_table_count = 0;
+ vector<const Value*> tables;
+ s = r.ExtractObjectArray(ksck, "table_summaries", &tables);
+ CHECK(s.ok() || s.IsNotFound());
+ if (s.ok()) {
+ for (const Value* table : tables) {
+ string name;
+ CHECK_OK(r.ExtractString(table, "name", &name));
+ string health;
+ CHECK_OK(r.ExtractString(table, "health", &health));
+ HealthCheckResult health_status = ExtractTableHealthStatus(health);
+ items.emplace_back(reporter_->ConstructItem(
+ name,
+ "kudu-table-health",
+ "table",
+ timestamp,
+ static_cast<int64_t>(health_status),
+ "GAUGE",
+ ""));
+ if (health_status == HealthCheckResult::HEALTHY) {
+ health_table_count += 1;
+ }
+ }
+ TRACE(Substitute("Tables health info reported, count $0", tables.size()));
+ }
+
+ // Healthy table ratio.
+ if (!tables.empty()) {
+ items.emplace_back(reporter_->ConstructItem(
+ FLAGS_collector_cluster_name,
+ "healthy_table_proportion",
+ "cluster",
+ timestamp,
+ 100 * health_table_count / tables.size(),
+ "GAUGE",
+ ""));
+ TRACE("Healthy table ratio reported");
+ }
+
+ // Count summaries.
+ vector<const Value*> count_summaries;
+ CHECK_OK(r.ExtractObjectArray(ksck, "count_summaries", &count_summaries));
+ for (const Value* count_summarie : count_summaries) {
+ // TODO(yingchun) should auto iterate items
+ static const vector<string>
+ count_names({"masters", "tservers", "tables", "tablets", "replicas"});
+ for (const auto& name : count_names) {
+ int64_t count;
+ CHECK_OK(r.ExtractInt64(count_summarie, name.c_str(), &count));
+ items.emplace_back(reporter_->ConstructItem(
+ FLAGS_collector_cluster_name,
+ name + "_count",
+ "cluster",
+ timestamp,
+ count,
+ "GAUGE",
+ ""));
+ }
+ }
+ TRACE("Count summaries reported");
+
+ reporter_->PushItems(std::move(items));
+ TRACE("Pushed");
+
+ return Status::OK();
+}
+
+ServerHealth NodesChecker::ExtractServerHealthStatus(const string& health) {
+ if (health == "HEALTHY") return ServerHealth::HEALTHY;
+ if (health == "UNAUTHORIZED") return ServerHealth::UNAUTHORIZED;
+ if (health == "UNAVAILABLE") return ServerHealth::UNAVAILABLE;
+ if (health == "WRONG_SERVER_UUID") return ServerHealth::WRONG_SERVER_UUID;
+ CHECK(false) << "Unknown server health: " << health;
+ __builtin_unreachable();
+}
+
+HealthCheckResult NodesChecker::ExtractTableHealthStatus(const string& health) {
+ if (health == "HEALTHY") return HealthCheckResult::HEALTHY;
+ if (health == "RECOVERING") return HealthCheckResult::RECOVERING;
+ if (health == "UNDER_REPLICATED") return HealthCheckResult::UNDER_REPLICATED;
+ if (health == "UNAVAILABLE") return HealthCheckResult::UNAVAILABLE;
+ if (health == "CONSENSUS_MISMATCH") return HealthCheckResult::CONSENSUS_MISMATCH;
+ CHECK(false) << "Unknown table health: " << health;
+ __builtin_unreachable();
+}
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/nodes_checker.h b/src/kudu/collector/nodes_checker.h
new file mode 100644
index 0000000..26aee89
--- /dev/null
+++ b/src/kudu/collector/nodes_checker.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rebalance/cluster_status.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Thread;
+} // namespace kudu
+
+namespace kudu {
+
+namespace collector {
+
+class ReporterBase;
+
+class NodesChecker : public RefCounted<NodesChecker> {
+ public:
+ explicit NodesChecker(scoped_refptr<ReporterBase> reporter);
+ ~NodesChecker();
+
+ Status Init();
+ Status Start();
+ void Shutdown();
+
+ std::string ToString() const;
+
+ std::vector<std::string> GetNodes();
+ std::string GetFirstMaster();
+
+ private:
+ friend class RefCounted<NodesChecker>;
+
+ FRIEND_TEST(TestNodesChecker, TestExtractServerHealthStatus);
+ FRIEND_TEST(TestNodesChecker, TestExtractTableHealthStatus);
+
+ Status StartNodesCheckerThread();
+ void NodesCheckerThread();
+
+ void UpdateAndCheckNodes();
+ Status UpdateNodes();
+ Status UpdateServers(const std::string& role);
+ Status CheckNodes() const;
+ Status ReportNodesMetrics(const std::string& data) const;
+
+ static cluster_summary::ServerHealth ExtractServerHealthStatus(const std::string& health);
+ static cluster_summary::HealthCheckResult ExtractTableHealthStatus(const std::string& health);
+
+ static const std::string kMaster;
+ static const std::string kTserver;
+
+ bool initialized_;
+
+ scoped_refptr<ReporterBase> reporter_;
+
+ CountDownLatch stop_background_threads_latch_;
+ scoped_refptr<Thread> nodes_checker_thread_;
+
+ mutable RWMutex nodes_lock_;
+ std::vector<std::string> tserver_http_addrs_;
+ std::vector<std::string> master_http_addrs_;
+
+ DISALLOW_COPY_AND_ASSIGN(NodesChecker);
+};
+} // namespace collector
+} // namespace kudu
diff --git a/src/kudu/collector/reporter_base.h b/src/kudu/collector/reporter_base.h
new file mode 100644
index 0000000..d03b80c
--- /dev/null
+++ b/src/kudu/collector/reporter_base.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/server/server_base.h"
+#include "kudu/tools/ksck_results.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+namespace collector {
+
+struct ItemBase : public RefCounted<ItemBase> {
+ virtual ~ItemBase() = default;
+
+ private:
+ friend class RefCounted<ItemBase>;
+};
+
+class ReporterBase : public RefCounted<ReporterBase> {
+ public:
+ virtual ~ReporterBase() = default;
+
+ virtual Status Init() = 0;
+ virtual Status Start() = 0;
+ virtual void Shutdown() = 0;
+
+ virtual std::string ToString() const = 0;
+
+ // TODO(yingchun) This function is not generic enough for base class.
+ virtual scoped_refptr<ItemBase> ConstructItem(std::string endpoint,
+ std::string metric,
+ std::string level,
+ uint64_t timestamp,
+ int64_t value,
+ std::string counter_type,
+ std::string extra_tags) = 0;
+ virtual Status PushItems(std::list<scoped_refptr<ItemBase>> items) = 0;
+
+ protected:
+ friend class RefCounted<ReporterBase>;
+};
+} // namespace collector
+} // namespace kudu
+
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6aefb42..21de53d 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -61,6 +61,7 @@
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/common.h>
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 8e98c5d..7834ea4 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#include <stdlib.h>
-
#include <algorithm>
#include <cstdint>
#include <functional>
diff --git a/src/kudu/util/jsonreader.h b/src/kudu/util/jsonreader.h
index 125b762..dcc6603 100644
--- a/src/kudu/util/jsonreader.h
+++ b/src/kudu/util/jsonreader.h
@@ -28,6 +28,10 @@
namespace kudu {
+namespace collector {
+class MetricsCollector;
+} // namespace collector
+
// Wraps the JSON parsing functionality of rapidjson::Document.
//
// Unlike JsonWriter, this class does not hide rapidjson internals from
@@ -93,6 +97,8 @@ class JsonReader {
const rapidjson::Value* root() const { return &document_; }
private:
+ friend class collector::MetricsCollector;
+
Status ExtractField(const rapidjson::Value* object,
const char* field,
const rapidjson::Value** result) const;