You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/08/15 04:25:27 UTC
[kudu] branch master updated: KUDU-2797 p2: the master aggregates
tablet statistics
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 0d48cad KUDU-2797 p2: the master aggregates tablet statistics
0d48cad is described below
commit 0d48cad4976d670c22182ca8d838a568f3b7034b
Author: helifu <hz...@corp.netease.com>
AuthorDate: Mon Aug 12 10:01:28 2019 +0800
KUDU-2797 p2: the master aggregates tablet statistics
There are some jiras are talking about metrics:
KUDU-1067, KUDU-1373, KUDU-2019, KUDU-2797.
In this patch, it makes the following improvements:
1) disk size and live row count of the replicas that are
the leadership roles are aggregated on the master server;
2) disk size and live row count of the table are exposed
as metrics on the master server;
3) disk size and live row count of the table are exposed
on the master server's Web-UI;
Change-Id: I74406ab7cca7c22fda455c328b8ee9989a6b2d99
Reviewed-on: http://gerrit.cloudera.org:8080/13426
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
src/kudu/integration-tests/cluster_itest_util.cc | 41 ++++
src/kudu/integration-tests/cluster_itest_util.h | 12 ++
.../integration-tests/tablet_replacement-itest.cc | 2 +
.../integration-tests/ts_tablet_manager-itest.cc | 223 +++++++++++++++++++++
src/kudu/master/CMakeLists.txt | 1 +
src/kudu/master/catalog_manager.cc | 79 +++++++-
src/kudu/master/catalog_manager.h | 42 +++-
src/kudu/master/master.proto | 4 +
src/kudu/master/master_path_handlers.cc | 17 ++
src/kudu/master/table_metrics.cc | 41 ++++
src/kudu/master/table_metrics.h | 52 +++++
src/kudu/tablet/local_tablet_writer.h | 11 +-
src/kudu/tablet/metadata.proto | 6 +
src/kudu/tablet/tablet_replica.cc | 32 ++-
src/kudu/tablet/tablet_replica.h | 11 +
src/kudu/tserver/heartbeater.cc | 34 ++--
src/kudu/tserver/heartbeater.h | 12 +-
src/kudu/tserver/ts_tablet_manager-test.cc | 88 +++++++-
src/kudu/tserver/ts_tablet_manager.cc | 74 ++++++-
src/kudu/tserver/ts_tablet_manager.h | 15 ++
src/kudu/util/metrics.cc | 1 -
www/table.mustache | 2 +
22 files changed, 765 insertions(+), 35 deletions(-)
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 92f6cfc..0b287de 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -1270,6 +1270,47 @@ Status SetupAdministratorPrivileges(MiniKdc* kdc,
return kdc->Kinit("test-admin");
}
+Status AlterTableName(const shared_ptr<MasterServiceProxy>& master_proxy,
+ const string& table_id,
+ const string& old_table_name,
+ const string& new_table_name,
+ const MonoDelta& timeout) {
+ master::AlterTableRequestPB req;
+ req.mutable_table()->set_table_id(table_id);
+ req.mutable_table()->set_table_name(old_table_name);
+ req.set_new_table_name(new_table_name);
+ master::AlterTableResponsePB resp;
+ RpcController controller;
+ controller.set_timeout(timeout);
+
+ RETURN_NOT_OK(master_proxy->AlterTable(req, &resp, &controller));
+ RETURN_NOT_OK(controller.status());
+ if (resp.has_error()) {
+ return Status::RemoteError("Response had an error", SecureShortDebugString(resp.error()));
+ }
+
+ return Status::OK();
+}
+
+Status DeleteTable(const std::shared_ptr<master::MasterServiceProxy>& master_proxy,
+ const std::string& table_id,
+ const std::string& table_name,
+ const MonoDelta& timeout) {
+ master::DeleteTableRequestPB req;
+ req.mutable_table()->set_table_id(table_id);
+ req.mutable_table()->set_table_name(table_name);
+ master::DeleteTableResponsePB resp;
+ RpcController controller;
+ controller.set_timeout(timeout);
+
+ RETURN_NOT_OK(master_proxy->DeleteTable(req, &resp, &controller));
+ RETURN_NOT_OK(controller.status());
+ if (resp.has_error()) {
+ return Status::RemoteError("Response had an error", SecureShortDebugString(resp.error()));
+ }
+
+ return Status::OK();
+}
} // namespace itest
} // namespace kudu
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 682857e..b311b95 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -468,5 +468,17 @@ Status GetTsCounterValue(cluster::ExternalTabletServer* ets,
Status SetupAdministratorPrivileges(MiniKdc* kdc,
const HostPort& address);
+// Alter the table name.
+Status AlterTableName(const std::shared_ptr<master::MasterServiceProxy>& master_proxy,
+ const std::string& table_id,
+ const std::string& old_table_name,
+ const std::string& new_table_name,
+ const MonoDelta& timeout);
+
+// Delete the table.
+Status DeleteTable(const std::shared_ptr<master::MasterServiceProxy>& master_proxy,
+ const std::string& table_id,
+ const std::string& table_name,
+ const MonoDelta& timeout);
} // namespace itest
} // namespace kudu
diff --git a/src/kudu/integration-tests/tablet_replacement-itest.cc b/src/kudu/integration-tests/tablet_replacement-itest.cc
index b315267..b71f771 100644
--- a/src/kudu/integration-tests/tablet_replacement-itest.cc
+++ b/src/kudu/integration-tests/tablet_replacement-itest.cc
@@ -157,6 +157,8 @@ void TabletReplacementITest::TestDontEvictIfRemainingConfigIsUnstable(
Substitute("--follower_unavailable_considered_failed_sec=$0", kUnavailableSec),
Substitute("--consensus_rpc_timeout_ms=$0", kConsensusRpcTimeoutSec * 1000),
Substitute("--heartbeat_interval_ms=$0", kTsToMasterHbIntervalSec * 1000),
+ // 'update_tablet_stats_interval_ms' should be larger than 'heartbeat_interval_ms'.
+ Substitute("--update_tablet_stats_interval_ms=$0", (kTsToMasterHbIntervalSec + 1) * 1000),
"--raft_heartbeat_interval_ms=50",
"--enable_leader_failure_detection=false",
};
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 3695591..6f42415 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+#include <stdint.h>
+
#include <cstdlib>
+#include <functional>
#include <map>
#include <memory>
#include <ostream>
@@ -32,19 +35,25 @@
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
#include "kudu/master/mini_master.h"
+#include "kudu/master/table_metrics.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/rpc/messenger.h"
#include "kudu/tablet/metadata.pb.h"
@@ -54,6 +63,8 @@
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tablet_server_options.h"
#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
@@ -67,12 +78,18 @@ DECLARE_bool(catalog_manager_evict_excess_replicas);
DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
DECLARE_bool(enable_leader_failure_detection);
DECLARE_bool(raft_prepare_replacement_before_eviction);
+DECLARE_double(leader_failure_max_missed_heartbeat_periods);
+DECLARE_int32(heartbeat_interval_ms);
+DECLARE_int32(metrics_retirement_age_ms);
+DECLARE_int32(raft_heartbeat_interval_ms);
DEFINE_int32(num_election_test_loops, 3,
"Number of random EmulateElection() loops to execute in "
"TestReportNewLeaderOnLeaderChange");
using kudu::client::KuduClient;
+using kudu::client::KuduInsert;
using kudu::client::KuduSchema;
+using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::cluster::InternalMiniCluster;
@@ -85,8 +102,12 @@ using kudu::consensus::RaftConfigPB;
using kudu::consensus::RaftConsensus;
using kudu::consensus::RaftPeerPB;
using kudu::itest::SimpleIntKeyKuduSchema;
+using kudu::KuduPartialRow;
+using kudu::master::CatalogManager;
+using kudu::master::Master;
using kudu::master::MasterServiceProxy;
using kudu::master::ReportedTabletPB;
+using kudu::master::TableInfo;
using kudu::master::TabletReportPB;
using kudu::rpc::Messenger;
using kudu::rpc::MessengerBuilder;
@@ -559,5 +580,207 @@ TEST_F(TsTabletManagerITest, TestDeduplicateMasterAddrsForHeartbeaters) {
ASSERT_EQ(1, hb->threads_.size());
}
+TEST_F(TsTabletManagerITest, TestTableStats) {
+ const int kNumMasters = 3;
+ const int kNumTservers = 3;
+ const int kNumReplicas = 3;
+ const int kNumHashBuckets = 3;
+ const int kRowsCount = rand() % 1000;
+ const string kNewTableName = "NewTableName";
+ client::sp::shared_ptr<KuduTable> table;
+
+ // Shorten the time and speed up the test.
+ FLAGS_heartbeat_interval_ms = 100;
+ FLAGS_raft_heartbeat_interval_ms = 100;
+ FLAGS_leader_failure_max_missed_heartbeat_periods = 2;
+ int64_t kMaxElectionTime =
+ FLAGS_raft_heartbeat_interval_ms * FLAGS_leader_failure_max_missed_heartbeat_periods;
+
+ // Get the LEADER master.
+ const auto GetLeaderMaster = [&] () -> Master* {
+ int idx = 0;
+ Master* master = nullptr;
+ Status s = cluster_->GetLeaderMasterIndex(&idx);
+ if (s.ok()) {
+ master = cluster_->mini_master(idx)->master();
+ }
+ return CHECK_NOTNULL(master);
+ };
+ // Get the LEADER master's service proxy.
+ const auto GetLeaderMasterServiceProxy = [&] () -> shared_ptr<MasterServiceProxy> {
+ const auto& addr = GetLeaderMaster()->first_rpc_address();
+ shared_ptr<MasterServiceProxy> proxy(
+ new MasterServiceProxy(client_messenger_, addr, addr.host()));
+ return proxy;
+ };
+ // Get the LEADER master and run the check function.
+ const auto GetLeaderMasterAndRun = [&] (int64_t live_row_count,
+ const std::function<void(TableInfo*, int64_t)>& check_function) {
+ CatalogManager* catalog = GetLeaderMaster()->catalog_manager();
+ master::CatalogManager::ScopedLeaderSharedLock l(catalog);
+ ASSERT_OK(l.first_failed_status());
+ // Get the TableInfo.
+ vector<scoped_refptr<TableInfo>> table_infos;
+ ASSERT_OK(catalog->GetAllTables(&table_infos));
+ ASSERT_EQ(1, table_infos.size());
+ // Run the check function.
+ NO_FATALS(check_function(table_infos[0].get(), live_row_count));
+ };
+ // Check the stats.
+ const auto CheckStats = [&] (int64_t live_row_count) {
+ // Trigger heartbeat.
+ for (int i = 0; i < kNumTservers; ++i) {
+ TabletServer* tserver = CHECK_NOTNULL(cluster_->mini_tablet_server(i)->server());
+ tserver->tablet_manager()->SetNextUpdateTimeForTests();
+ tserver->heartbeater()->TriggerASAP();
+ }
+
+ // Check the stats.
+ NO_FATALS(GetLeaderMasterAndRun(live_row_count, [&] (
+ TableInfo* table_info, int64_t live_row_count) {
+ ASSERT_EVENTUALLY([&] () {
+ ASSERT_EQ(live_row_count, table_info->GetMetrics()->live_row_count->value());
+ });
+ }));
+ };
+
+ // 1. Start a cluster.
+ {
+ InternalMiniClusterOptions opts;
+ opts.num_masters = kNumMasters;
+ opts.num_tablet_servers = kNumTservers;
+ NO_FATALS(StartCluster(std::move(opts)));
+ ASSERT_EQ(kNumMasters, cluster_->num_masters());
+ ASSERT_EQ(kNumTservers, cluster_->num_tablet_servers());
+ }
+
+ // 2. Create a table.
+ {
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ ASSERT_OK(table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .add_hash_partitions({ "key" }, kNumHashBuckets)
+ .num_replicas(kNumReplicas)
+ .timeout(MonoDelta::FromSeconds(60))
+ .Create());
+ ASSERT_OK(client_->OpenTable(kTableName, &table));
+ NO_FATALS(CheckStats(0));
+ }
+
+ // 3. Write some rows and verify the stats.
+ {
+ client::sp::shared_ptr<KuduSession> session(client_->NewSession());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ for (int i = 0; i < kRowsCount; i++) {
+ KuduInsert* insert = table->NewInsert();
+ KuduPartialRow* row = insert->mutable_row();
+ ASSERT_OK(row->SetInt32(0, i));
+ ASSERT_OK(session->Apply(insert));
+ }
+ ASSERT_OK(session->Flush());
+ NO_FATALS(CheckStats(kRowsCount));
+ }
+
+ // 4. Change the tablet leadership.
+ {
+ // Build a TServerDetails map so we can check for convergence.
+ itest::TabletServerMap ts_map;
+ ASSERT_OK(CreateTabletServerMap(GetLeaderMasterServiceProxy(), client_messenger_, &ts_map));
+ ValueDeleter deleter(&ts_map);
+
+ // Collect the TabletReplicas so we get direct access to RaftConsensus.
+ vector<scoped_refptr<TabletReplica>> tablet_replicas;
+ ASSERT_OK(PrepareTabletReplicas(MonoDelta::FromSeconds(60), &tablet_replicas));
+ ASSERT_EQ(kNumReplicas * kNumHashBuckets, tablet_replicas.size());
+
+ for (int i = 0; i < kNumReplicas * kNumHashBuckets; ++i) {
+ scoped_refptr<TabletReplica> replica = tablet_replicas[i];
+ if (consensus::RaftPeerPB_Role_LEADER == replica->consensus()->role()) {
+ // Step down.
+ itest::TServerDetails* tserver =
+ CHECK_NOTNULL(FindOrDie(ts_map, replica->permanent_uuid()));
+ ASSERT_OK(LeaderStepDown(tserver, replica->tablet_id(), MonoDelta::FromSeconds(10)));
+ SleepFor(MonoDelta::FromMilliseconds(kMaxElectionTime));
+ // Check stats after every election.
+ NO_FATALS(CheckStats(kRowsCount));
+ }
+ }
+ }
+
+ // 5. Rename the table.
+ {
+ ASSERT_OK(itest::AlterTableName(GetLeaderMasterServiceProxy(),
+ table->id(), kTableName, kNewTableName, MonoDelta::FromSeconds(5)));
+
+ // Check table id, table name and stats.
+ NO_FATALS(GetLeaderMasterAndRun(kRowsCount ,[&] (
+ TableInfo* table_info, int64_t live_row_count) {
+ std::ostringstream out;
+ JsonWriter writer(&out, JsonWriter::PRETTY);
+ ASSERT_OK(table_info->metric_entity_->WriteAsJson(&writer, MetricJsonOptions()));
+ string metric_attrs_str = out.str();
+ ASSERT_STR_NOT_CONTAINS(metric_attrs_str, kTableName);
+ ASSERT_STR_CONTAINS(metric_attrs_str, kNewTableName);
+ ASSERT_EQ(table->id(), table_info->metric_entity_->id());
+ ASSERT_EQ(live_row_count, table_info->GetMetrics()->live_row_count->value());
+ }));
+ }
+
+ // 6. Restart the masters.
+ {
+ for (int i = 0; i < kNumMasters; ++i) {
+ int idx = 0;
+ ASSERT_OK(cluster_->GetLeaderMasterIndex(&idx));
+ master::MiniMaster* mini_master = CHECK_NOTNULL(cluster_->mini_master(idx));
+ mini_master->Shutdown();
+ SleepFor(MonoDelta::FromMilliseconds(kMaxElectionTime));
+ ASSERT_OK(mini_master->Restart());
+ // Sometimes the election fails until the node restarts.
+ // And the restarted node is elected leader again.
+ // So, it is necessary to wait for all tservers to report.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms));
+ NO_FATALS(CheckStats(kRowsCount));
+ }
+ }
+
+ // 7. Restart the tservers.
+ {
+ for (int i = 0; i < kNumTservers; ++i) {
+ tserver::MiniTabletServer* mini_tserver = CHECK_NOTNULL(cluster_->mini_tablet_server(i));
+ mini_tserver->Shutdown();
+ ASSERT_OK(mini_tserver->Start());
+ SleepFor(MonoDelta::FromMilliseconds(kMaxElectionTime));
+ ASSERT_OK(mini_tserver->WaitStarted());
+ NO_FATALS(CheckStats(kRowsCount));
+ }
+ }
+
+ // 8. Delete the table.
+ {
+ ASSERT_OK(itest::DeleteTable(GetLeaderMasterServiceProxy(),
+ table->id(), kNewTableName, MonoDelta::FromSeconds(5)));
+
+ FLAGS_metrics_retirement_age_ms = -1;
+ MetricRegistry* metric_registry = CHECK_NOTNULL(GetLeaderMaster()->metric_registry());
+ const auto GetMetricsString = [&] (string* ret) {
+ std::ostringstream out;
+ JsonWriter writer(&out, JsonWriter::PRETTY);
+ ASSERT_OK(metric_registry->WriteAsJson(&writer, MetricJsonOptions()));
+ *ret = out.str();
+ };
+
+ string metrics_str;
+ // On the first call, the metric is returned and internally retired, but it is not deleted.
+ NO_FATALS(GetMetricsString(&metrics_str));
+ ASSERT_STR_CONTAINS(metrics_str, kNewTableName);
+ // On the second call, the metric is returned and then deleted.
+ NO_FATALS(GetMetricsString(&metrics_str));
+ ASSERT_STR_CONTAINS(metrics_str, kNewTableName);
+ // On the third call, the metric is no longer available.
+ NO_FATALS(GetMetricsString(&metrics_str));
+ ASSERT_STR_NOT_CONTAINS(metrics_str, kNewTableName);
+ }
+}
+
} // namespace tserver
} // namespace kudu
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index a8098e8..fc6aa05 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -49,6 +49,7 @@ set(MASTER_SRCS
sentry_privileges_cache_metrics.cc
sentry_privileges_fetcher.cc
sys_catalog.cc
+ table_metrics.cc
ts_descriptor.cc
ts_manager.cc)
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index e37b1eb..fd67c82 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -108,6 +108,7 @@
#include "kudu/master/placement_policy.h"
#include "kudu/master/sentry_authz_provider.h"
#include "kudu/master/sys_catalog.h"
+#include "kudu/master/table_metrics.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/rpc/messenger.h"
@@ -271,6 +272,8 @@ DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_bool(raft_attempt_to_replace_replica_without_majority);
DECLARE_int64(tsk_rotation_seconds);
+METRIC_DEFINE_entity(table);
+
using base::subtle::NoBarrier_CompareAndSwap;
using base::subtle::NoBarrier_Load;
using boost::make_optional;
@@ -300,6 +303,7 @@ using kudu::security::TokenSigner;
using kudu::security::TokenSigningPrivateKey;
using kudu::security::TokenSigningPrivateKeyPB;
using kudu::security::TokenSigningPublicKeyPB;
+using kudu::tablet::ReportedTabletStatsPB;
using kudu::tablet::TABLET_DATA_DELETED;
using kudu::tablet::TABLET_DATA_TOMBSTONED;
using kudu::tablet::TabletDataState;
@@ -362,6 +366,9 @@ class TableLoader : public TableVisitor {
l.Commit();
if (!is_deleted) {
+ // It's unnecessary to register metrics for the deleted tables.
+ table->RegisterMetrics(catalog_manager_->master_->metric_registry(),
+ CatalogManager::NormalizeTableName(metadata.name()));
LOG(INFO) << Substitute("Loaded metadata for table $0", table->ToString());
}
VLOG(2) << Substitute("Metadata for table $0: $1",
@@ -1778,6 +1785,7 @@ scoped_refptr<TableInfo> CatalogManager::CreateTableInfo(
partition_schema.ToPB(metadata->mutable_partition_schema());
metadata->set_create_timestamp(time(nullptr));
(*metadata->mutable_extra_config()) = std::move(extra_config_pb);
+ table->RegisterMetrics(master_->metric_registry(), metadata->name());
return table;
}
@@ -2049,6 +2057,7 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
<< " from map in response to DeleteTable request: "
<< SecureShortDebugString(req);
}
+ table->UnregisterMetrics();
}
// 5. Commit the dirty tablet state.
@@ -2675,6 +2684,9 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
<< SecureShortDebugString(req);
}
InsertOrDie(&normalized_table_names_map_, normalized_new_table_name, table);
+
+ // Alter the table name in the attributes of the metrics.
+ table->UpdateMetricsAttrs(normalized_new_table_name);
}
// Insert new tablets into the global tablet map. After this, the tablets
@@ -4176,12 +4188,23 @@ Status CatalogManager::ProcessTabletReport(
if (tablet_was_mutated) {
mutated_tablets.push_back(tablet);
}
+
+ // 10. Process the report's tablet statistics.
+ if (report.has_stats() && report.has_consensus_state()) {
+ DCHECK(ts_desc->permanent_uuid() == report.consensus_state().leader_uuid());
+
+ // Now the tserver only reports the LEADER replicas its own.
+ // First, update table's stats.
+ tablet->table()->UpdateMetrics(tablet->GetStats(), report.stats());
+ // Then, update tablet's stats.
+ tablet->UpdateStats(report.stats());
+ }
}
- // 10. Unlock the tables; we no longer need to access their state.
+ // 11. Unlock the tables; we no longer need to access their state.
tables_lock.Unlock();
- // 11. Write all tablet mutations to the catalog table.
+ // 12. Write all tablet mutations to the catalog table.
//
// SysCatalogTable::Write will short-circuit the case where the data has not
// in fact changed since the previous version and avoid any unnecessary mutations.
@@ -4198,10 +4221,10 @@ Status CatalogManager::ProcessTabletReport(
// Having successfully written the tablet mutations, this function cannot
// fail from here on out.
- // 12. Publish the in-memory tablet mutations and release the locks.
+ // 13. Publish the in-memory tablet mutations and release the locks.
tablets_lock.Commit();
- // 13. Process all tablet schema version changes.
+ // 14. Process all tablet schema version changes.
//
// This is separate from tablet state mutations because only tablet in-memory
// state (and table on-disk state) is changed.
@@ -4214,7 +4237,7 @@ Status CatalogManager::ProcessTabletReport(
}
}
- // 14. Send all queued RPCs.
+ // 15. Send all queued RPCs.
for (auto& rpc : rpcs) {
if (rpc->table() != nullptr) {
rpc->table()->AddTask(rpc.get());
@@ -5302,6 +5325,17 @@ string TabletInfo::ToString() const {
(table_ != nullptr ? table_->ToString() : "MISSING"));
}
+
+void TabletInfo::UpdateStats(ReportedTabletStatsPB stats) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ stats_ = std::move(stats);
+}
+
+ReportedTabletStatsPB TabletInfo::GetStats() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return stats_;
+}
+
void PersistentTabletInfo::set_state(SysTabletsEntryPB::State state, const string& msg) {
pb.set_state(state);
pb.set_state_msg(msg);
@@ -5328,6 +5362,8 @@ void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablet
const auto& lower_bound = tablet->metadata().state().pb.partition().partition_key_start();
CHECK(EraseKeyReturnValuePtr(&tablet_map_, lower_bound) != nullptr);
DecrementSchemaVersionCountUnlocked(tablet->reported_schema_version());
+ // Update the table metrics for the deleted tablets.
+ UpdateMetrics(tablet->GetStats(), ReportedTabletStatsPB());
}
for (const auto& tablet : tablets_to_add) {
TabletInfo* old = nullptr;
@@ -5462,6 +5498,39 @@ void TableInfo::GetAllTablets(vector<scoped_refptr<TabletInfo>>* ret) const {
}
}
+void TableInfo::RegisterMetrics(MetricRegistry* metric_registry, const string& table_name) {
+ if (metric_registry) {
+ MetricEntity::AttributeMap attrs;
+ attrs["table_name"] = table_name;
+ metric_entity_ = METRIC_ENTITY_table.Instantiate(metric_registry, table_id_, attrs);
+ metrics_.reset(new TableMetrics(metric_entity_));
+ }
+}
+
+void TableInfo::UnregisterMetrics() {
+ if (metric_entity_) {
+ metric_entity_->Unpublish();
+ }
+}
+
+void TableInfo::UpdateMetrics(const tablet::ReportedTabletStatsPB& old_stats,
+ const tablet::ReportedTabletStatsPB& new_stats) {
+ if (metrics_) {
+ metrics_->on_disk_size->IncrementBy(new_stats.on_disk_size() - old_stats.on_disk_size());
+ metrics_->live_row_count->IncrementBy(new_stats.live_row_count() - old_stats.live_row_count());
+ }
+}
+
+void TableInfo::UpdateMetricsAttrs(const string& new_table_name) {
+ if (metric_entity_) {
+ metric_entity_->SetAttribute("table_name", new_table_name);
+ }
+}
+
+const TableMetrics* TableInfo::GetMetrics() const {
+ return metrics_.get();
+}
+
void TableInfo::IncrementSchemaVersionCountUnlocked(int64_t version) {
DCHECK(lock_.is_write_locked());
schema_version_counts_[version]++;
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index aa08b0d..3e89815 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -43,6 +43,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/master/master.pb.h"
+#include "kudu/tablet/metadata.pb.h"
#include "kudu/tserver/tablet_replica_lookup.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/cow_object.h"
@@ -59,13 +60,15 @@ namespace kudu {
class AuthzTokenTest_TestSingleMasterUnavailable_Test;
class CreateTableStressTest_TestConcurrentCreateTableAndReloadMetadata_Test;
+class MetricEntity;
+class MetricRegistry;
class MonitoredTask;
class NodeInstancePB;
class PartitionPB;
class PartitionSchema;
class Schema;
-class ThreadPool;
class TableExtraConfigPB;
+class ThreadPool;
struct ColumnId;
// Working around FRIEND_TEST() ugliness.
@@ -93,6 +96,10 @@ class TokenSigner;
class TokenSigningPublicKeyPB;
} // namespace security
+namespace tserver {
+class TsTabletManagerITest_TestTableStats_Test;
+}
+
namespace tablet {
class TabletReplica;
} // namespace tablet
@@ -108,6 +115,7 @@ class SysCatalogTable;
class TSDescriptor;
class TableInfo;
struct DeferredAssignmentActions;
+struct TableMetrics;
// The data related to a tablet which is persisted on disk.
// This portion of TableInfo is managed via CowObject.
@@ -184,6 +192,12 @@ class TabletInfo : public RefCountedThreadSafe<TabletInfo> {
// No synchronization needed.
std::string ToString() const;
+ // Update the stats.
+ void UpdateStats(tablet::ReportedTabletStatsPB stats);
+
+ // Return the stats.
+ tablet::ReportedTabletStatsPB GetStats() const;
+
private:
friend class RefCountedThreadSafe<TabletInfo>;
~TabletInfo();
@@ -205,6 +219,9 @@ class TabletInfo : public RefCountedThreadSafe<TabletInfo> {
// Set to NOT_YET_REPORTED when the tablet hasn't yet reported.
int64_t reported_schema_version_;
+ // Cached stats for the LEADER replica.
+ tablet::ReportedTabletStatsPB stats_;
+
DISALLOW_COPY_AND_ASSIGN(TabletInfo);
};
@@ -300,9 +317,27 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
return tablet_map_.size();
}
+ // Register metrics for the table.
+ void RegisterMetrics(MetricRegistry* metric_registry, const std::string& table_name);
+
+ // Unregister metrics for the table.
+ void UnregisterMetrics();
+
+ // Update the metrics.
+ void UpdateMetrics(const tablet::ReportedTabletStatsPB& old_stats,
+ const tablet::ReportedTabletStatsPB& new_stats);
+
+ // Update the attributes of the metrics.
+ void UpdateMetricsAttrs(const std::string& new_table_name);
+
+ // Return the metrics.
+ const TableMetrics* GetMetrics() const;
+
private:
friend class RefCountedThreadSafe<TableInfo>;
friend class TabletInfo;
+ FRIEND_TEST(kudu::tserver::TsTabletManagerITest, TestTableStats);
+
~TableInfo();
// Increments or decrements the value for the key 'version' in
@@ -338,6 +373,9 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
// tablet_map_ and summing up the tablets' reported schema versions.
std::map<int64_t, int64_t> schema_version_counts_;
+ scoped_refptr<MetricEntity> metric_entity_;
+ std::unique_ptr<TableMetrics> metrics_;
+
DISALLOW_COPY_AND_ASSIGN(TableInfo);
};
@@ -1011,7 +1049,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
Status WaitForNotificationLogListenerCatchUp(RespClass* resp,
rpc::RpcContext* rpc) WARN_UNUSED_RESULT;
- // TODO: the maps are a little wasteful of RAM, since the TableInfo/TabletInfo
+ // TODO(unknown): the maps are a little wasteful of RAM, since the TableInfo/TabletInfo
// objects have a copy of the string key. But STL doesn't make it
// easy to make a "gettable set".
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 6df4836..fc5fef7 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -246,6 +246,10 @@ message ReportedTabletPB {
optional AppStatusPB error = 4;
optional uint32 schema_version = 5;
+
+ // Tablet statistics.
+ // This is only included in the report if the replica is a LEADER.
+ optional tablet.ReportedTabletStatsPB stats = 7;
}
// Sent by the tablet server to report the set of tablets hosted by that TS.
diff --git a/src/kudu/master/master_path_handlers.cc b/src/kudu/master/master_path_handlers.cc
index 7688796..2b926b8 100644
--- a/src/kudu/master/master_path_handlers.cc
+++ b/src/kudu/master/master_path_handlers.cc
@@ -42,10 +42,12 @@
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/quorum_util.h"
+#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/ascii_ctype.h"
+#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
@@ -55,6 +57,7 @@
#include "kudu/master/master.pb.h"
#include "kudu/master/master_options.h"
#include "kudu/master/sys_catalog.h"
+#include "kudu/master/table_metrics.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/server/monitored_task.h"
@@ -62,6 +65,7 @@
#include "kudu/util/cow_object.h"
#include "kudu/util/easy_json.h"
#include "kudu/util/jsonwriter.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
@@ -424,6 +428,19 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req,
}
}
+ const TableMetrics* table_metrics = table->GetMetrics();
+ if (table_metrics) {
+ // If the table doesn't support live row counts, the value will be negative.
+ // But the value of disk size will never be negative.
+ (*output)["table_disk_size"] =
+ HumanReadableNumBytes::ToString(table_metrics->on_disk_size->value());
+ int64 live_row_count = table_metrics->live_row_count->value();
+ if (live_row_count >= 0) {
+ (*output)["table_live_row_count"] = live_row_count;
+ } else {
+ (*output)["table_live_row_count"] = "N/A";
+ }
+ }
(*output)["partition_schema"] = partition_schema.DisplayString(schema, range_partitions);
string str_extra_configs;
diff --git a/src/kudu/master/table_metrics.cc b/src/kudu/master/table_metrics.cc
new file mode 100644
index 0000000..079379a
--- /dev/null
+++ b/src/kudu/master/table_metrics.cc
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/master/table_metrics.h"
+
+
+namespace kudu {
+namespace master {
+
+// Table-specific stats.
+METRIC_DEFINE_gauge_int64(table, on_disk_size, "Table Size On Disk",
+ kudu::MetricUnit::kBytes,
+ "Pre-replication aggregated disk space used by all tablets in this table, "
+ "including metadata.");
+METRIC_DEFINE_gauge_int64(table, live_row_count, "Table Live Row count",
+ kudu::MetricUnit::kRows,
+ "Pre-replication aggregated number of live rows in this table.");
+
+#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
+
+TableMetrics::TableMetrics(const scoped_refptr<MetricEntity>& entity)
+ : GINIT(on_disk_size),
+ GINIT(live_row_count) {
+}
+#undef GINIT
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/table_metrics.h b/src/kudu/master/table_metrics.h
new file mode 100644
index 0000000..26591b0
--- /dev/null
+++ b/src/kudu/master/table_metrics.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_MASTER_TABLE_METRICS_H
+#define KUDU_MASTER_TABLE_METRICS_H
+
+#include <cstdint>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/metrics.h"
+
+namespace kudu {
+namespace master {
+
+// The table metrics consist of the LEADER tablet metrics.
+//
+// The tservers periodically update tablet metrics based on the gflag
+// FLAGS_update_tablet_stats_interval_ms. Then each tserver sends its
+// own LEADER tablets' metrics to all of the masters through heartbeat
+// messages. But only the LEADER master aggregates and exposes these
+// metrics. These metrics are pre-replication.
+//
+// Note: the process is asynchronous, so the data are lagging.
+//
+// At the same time, there will be fluctuation of metrics possibly if
+// the tablet's leadership changes. And if the new LEADER master is
+// elected, the metrics may not be accurate until all tservers report,
+// and the time window should be FLAGS_heartbeat_interval_ms.
+struct TableMetrics {
+ explicit TableMetrics(const scoped_refptr<MetricEntity>& entity);
+
+ scoped_refptr<AtomicGauge<int64_t>> on_disk_size;
+ scoped_refptr<AtomicGauge<int64_t>> live_row_count;
+};
+
+} // namespace master
+} // namespace kudu
+
+#endif
diff --git a/src/kudu/tablet/local_tablet_writer.h b/src/kudu/tablet/local_tablet_writer.h
index 1419aaf..777bad4 100644
--- a/src/kudu/tablet/local_tablet_writer.h
+++ b/src/kudu/tablet/local_tablet_writer.h
@@ -23,10 +23,12 @@
#include "kudu/common/row_operations.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid_util.h"
+#include "kudu/gutil/macros.h"
#include "kudu/tablet/row_op.h"
#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metrics.h"
+#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/transactions/write_transaction.h"
-#include "kudu/gutil/macros.h"
namespace kudu {
namespace tablet {
@@ -116,6 +118,13 @@ class LocalTabletWriter {
}
op_idx++;
}
+
+ // Update the metrics.
+ TabletMetrics* metrics = tablet_->metrics();
+ if (metrics) {
+ metrics->rows_inserted->IncrementBy(op_idx);
+ }
+
return Status::OK();
}
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 8f19b2b..bff3a2e 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -193,3 +193,9 @@ enum TabletStatePB {
// The Tablet has been completely shut down.
SHUTDOWN = 4;
}
+
+// Statistics for a tablet replica.
+message ReportedTabletStatsPB {
+ required int64 on_disk_size = 1;
+ required int64 live_row_count = 2;
+}
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 9a58352..3decd0e 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -91,7 +91,7 @@ METRIC_DEFINE_gauge_string(tablet, state, "Tablet State",
"State of this tablet.");
METRIC_DEFINE_gauge_int64(tablet, live_row_count, "Tablet Live Row Count",
kudu::MetricUnit::kRows,
- "Number of live rows in this tablet, excludes deleted rows."
+ "Number of live rows in this tablet, excludes deleted rows. "
"When the tablet doesn't support live row counting, -1 will "
"be returned.");
@@ -819,6 +819,36 @@ int64_t TabletReplica::CountLiveRows() const {
return ret;
}
+void TabletReplica::UpdateTabletStats(vector<string>* dirty_tablets) {
+ // It's necessary to check the state before visiting the "consensus_".
+ if (RUNNING != state()) {
+ return;
+ }
+
+ ReportedTabletStatsPB pb;
+ pb.set_on_disk_size(OnDiskSize());
+ pb.set_live_row_count(CountLiveRows());
+
+ // We cannot hold 'lock_' while calling RaftConsensus::role() because
+ // it may invoke TabletReplica::StartFollowerTransaction() and lead to
+ // a deadlock.
+ RaftPeerPB::Role role = consensus_->role();
+
+ std::lock_guard<simple_spinlock> l(lock_);
+ if (stats_pb_.on_disk_size() != pb.on_disk_size() ||
+ stats_pb_.live_row_count() != pb.live_row_count()) {
+ if (consensus::RaftPeerPB_Role_LEADER == role) {
+ dirty_tablets->emplace_back(tablet_id());
+ }
+ stats_pb_.Swap(&pb);
+ }
+}
+
+ReportedTabletStatsPB TabletReplica::GetTabletStats() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return stats_pb_;
+}
+
void TabletReplica::MakeUnavailable(const Status& error) {
std::shared_ptr<Tablet> tablet;
{
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index f27faed..2d603fd 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -304,6 +304,14 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
// -1 will be returned if the tablet doesn't support live row counting.
int64_t CountLiveRows() const;
+ // Update the tablet stats.
+ // When the replica's stats change and it's the LEADER, it is added to
+ // the 'dirty_tablets'.
+ void UpdateTabletStats(std::vector<std::string>* dirty_tablets);
+
+ // Return the tablet stats.
+ ReportedTabletStatsPB GetTabletStats() const;
+
private:
friend class kudu::AlterTableTest;
friend class RefCountedThreadSafe<TabletReplica>;
@@ -381,6 +389,9 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
FunctionGaugeDetacher metric_detacher_;
+ // Cached stats for the tablet replica.
+ ReportedTabletStatsPB stats_pb_;
+
DISALLOW_COPY_AND_ASSIGN(TabletReplica);
};
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 1a58333..3216ffd 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -110,7 +110,6 @@ using kudu::master::TabletReportPB;
using kudu::pb_util::SecureDebugString;
using kudu::rpc::ErrorStatusPB;
using kudu::rpc::RpcController;
-using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
@@ -130,7 +129,7 @@ class Heartbeater::Thread {
Status Start();
Status Stop();
void TriggerASAP();
- void MarkTabletDirty(const string& tablet_id, const string& reason);
+ void MarkTabletsDirty(const vector<string>& tablet_ids, const string& reason);
void GenerateIncrementalTabletReport(TabletReportPB* report);
void GenerateFullTabletReport(TabletReportPB* report);
@@ -218,7 +217,6 @@ class Heartbeater::Thread {
Heartbeater::Heartbeater(UnorderedHostPortSet master_addrs, TabletServer* server) {
DCHECK_GT(master_addrs.size(), 0);
-
for (auto addr : master_addrs) {
threads_.emplace_back(new Thread(std::move(addr), server));
}
@@ -261,9 +259,9 @@ void Heartbeater::TriggerASAP() {
}
}
-void Heartbeater::MarkTabletDirty(const string& tablet_id, const string& reason) {
+void Heartbeater::MarkTabletsDirty(const vector<string>& tablet_ids, const string& reason) {
for (const auto& thread : threads_) {
- thread->MarkTabletDirty(tablet_id, reason);
+ thread->MarkTabletsDirty(tablet_ids, reason);
}
}
@@ -377,6 +375,9 @@ int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const {
Status Heartbeater::Thread::DoHeartbeat(MasterErrorPB* error,
ErrorStatusPB* error_status) {
+ // Update the tablet statistics if necessary.
+ server_->tablet_manager()->UpdateTabletStatsIfNecessary();
+
if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) {
return Status::IOError("failing all heartbeats for tests");
}
@@ -665,28 +666,31 @@ void Heartbeater::Thread::TriggerASAP() {
cond_.Signal();
}
-void Heartbeater::Thread::MarkTabletDirty(const string& tablet_id, const string& reason) {
+void Heartbeater::Thread::MarkTabletsDirty(const vector<string>& tablet_ids,
+ const string& /*reason*/) {
std::lock_guard<simple_spinlock> l(dirty_tablets_lock_);
// Even though this is an atomic load, it needs to hold the lock. To see why,
// consider this sequence:
// 0. Tablet t exists in dirty_tablets_.
- // 1. T1 calls MarkTabletDirty(t), loads x from next_report_seq_, and is
+ // 1. T1 calls MarkTabletsDirty(t), loads x from next_report_seq_, and is
// descheduled.
// 2. T2 generates a tablet report, incrementing next_report_seq_ to x+1.
- // 3. T3 calls MarkTabletDirty(t), loads x+1 into next_report_seq_, and
+ // 3. T3 calls MarkTabletsDirty(t), loads x+1 into next_report_seq_, and
// writes x+1 to state->change_seq.
// 4. T1 is scheduled. It tries to write x to state->change_seq, failing the
// CHECK_GE().
int32_t seqno = next_report_seq_.load();
- TabletReportState* state = FindOrNull(dirty_tablets_, tablet_id);
- if (state != nullptr) {
- CHECK_GE(seqno, state->change_seq);
- state->change_seq = seqno;
- } else {
- TabletReportState state = { seqno };
- InsertOrDie(&dirty_tablets_, tablet_id, state);
+ for (const auto& tablet_id : tablet_ids) {
+ TabletReportState* state = FindOrNull(dirty_tablets_, tablet_id);
+ if (state != nullptr) {
+ CHECK_GE(seqno, state->change_seq);
+ state->change_seq = seqno;
+ } else {
+ TabletReportState state = { seqno };
+ InsertOrDie(&dirty_tablets_, tablet_id, state);
+ }
}
}
diff --git a/src/kudu/tserver/heartbeater.h b/src/kudu/tserver/heartbeater.h
index e755b11..659e30f 100644
--- a/src/kudu/tserver/heartbeater.h
+++ b/src/kudu/tserver/heartbeater.h
@@ -27,10 +27,10 @@
#include "kudu/util/status.h"
namespace kudu {
-
-namespace master {
-class TabletReportPB;
-}
+
+namespace master {
+class TabletReportPB;
+}
namespace tserver {
@@ -52,12 +52,12 @@ class Heartbeater {
// heartbeat interval has not expired.
void TriggerASAP();
- // Mark the given tablet as dirty, or do nothing if it is already dirty.
+ // Mark the given tablets as dirty, or do nothing if they are already dirty.
//
// Tablet dirtiness is tracked separately for each master. Dirty tablets are
// included in the heartbeat's tablet report, and only marked not dirty once
// the report has been acknowledged by the master.
- void MarkTabletDirty(const std::string& tablet_id, const std::string& reason);
+ void MarkTabletsDirty(const std::vector<std::string>& tablet_ids, const std::string& reason);
~Heartbeater();
diff --git a/src/kudu/tserver/ts_tablet_manager-test.cc b/src/kudu/tserver/ts_tablet_manager-test.cc
index 6e5544d..48ea188 100644
--- a/src/kudu/tserver/ts_tablet_manager-test.cc
+++ b/src/kudu/tserver/ts_tablet_manager-test.cc
@@ -24,10 +24,12 @@
#include <vector>
#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/consensus/metadata.pb.h"
@@ -37,6 +39,8 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/master/master.pb.h"
+#include "kudu/tablet/local_tablet_writer.h"
+#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
@@ -51,6 +55,8 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+DECLARE_int32(update_tablet_metrics_interval_ms);
+
#define ASSERT_REPORT_HAS_UPDATED_TABLET(report, tablet_id) \
NO_FATALS(AssertReportHasUpdatedTablet(report, tablet_id))
@@ -71,12 +77,14 @@ using consensus::RaftConfigPB;
using master::ReportedTabletPB;
using master::TabletReportPB;
using pb_util::SecureShortDebugString;
+using tablet::LocalTabletWriter;
+using tablet::Tablet;
using tablet::TabletReplica;
class TsTabletManagerTest : public KuduTest {
public:
TsTabletManagerTest()
- : schema_({ ColumnSchema("key", UINT32) }, 1) {
+ : schema_({ ColumnSchema("key", INT32) }, 1) {
}
virtual void SetUp() OVERRIDE {
@@ -139,6 +147,15 @@ class TsTabletManagerTest : public KuduTest {
heartbeater_->MarkTabletReportsAcknowledgedForTests({ report });
}
+ void InsertTestRows(Tablet* tablet, int64_t count) {
+ LocalTabletWriter writer(tablet, &schema_);
+ KuduPartialRow row(&schema_);
+ for (int64_t i = 0; i < count; i++) {
+ ASSERT_OK(row.SetInt32(0, i));
+ ASSERT_OK(writer.Insert(row));
+ }
+ }
+
protected:
gscoped_ptr<MiniTabletServer> mini_server_;
FsManager* fs_manager_;
@@ -248,7 +265,6 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
ASSERT_TRUE(report.is_incremental());
ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
}
-
ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");
// If we don't acknowledge the report, and ask for another incremental report,
@@ -308,5 +324,73 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
}
+TEST_F(TsTabletManagerTest, TestTabletStatsReports) {
+ TabletReportPB report;
+ int64_t seqno = -1;
+ const int64_t kCount = 12;
+
+ // 1. Create two tablets.
+ scoped_refptr<tablet::TabletReplica> replica1;
+ ASSERT_OK(CreateNewTablet("tablet-1", schema_, boost::none, boost::none, &replica1));
+ ASSERT_OK(CreateNewTablet("tablet-2", schema_, boost::none, boost::none, nullptr));
+
+ // 2. Do a full report - should include these two tablets but statistics are all zero.
+ NO_FATALS(GenerateFullTabletReport(&report));
+ ASSERT_FALSE(report.is_incremental());
+ ASSERT_EQ(2, report.updated_tablets().size());
+ ASSERT_FALSE(report.updated_tablets(0).has_stats());
+ ASSERT_FALSE(report.updated_tablets(1).has_stats());
+ ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
+ MarkTabletReportAcknowledged(report);
+
+ // 3. Trigger updates to tablet statistics as soon as possible.
+ tablet_manager_->SetNextUpdateTimeForTests();
+ heartbeater_->TriggerASAP();
+
+ // Do an incremental report - should include these two tablets and tablet statistics.
+ ASSERT_EVENTUALLY([&] () {
+ NO_FATALS(GenerateIncrementalTabletReport(&report));
+ ASSERT_TRUE(report.is_incremental());
+ ASSERT_EQ(2, report.updated_tablets().size());
+ });
+ ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
+ for (int i = 0; i < 2; ++i) {
+ ASSERT_TRUE(report.updated_tablets(i).has_stats());
+ ASSERT_GT(report.updated_tablets(i).stats().on_disk_size(), 0);
+ ASSERT_EQ(0, report.updated_tablets(i).stats().live_row_count());
+ }
+ ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");
+ ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-2");
+ MarkTabletReportAcknowledged(report);
+
+ // Clean the pending dirty tablets that are not acknowledged since the seqno race.
+ ASSERT_EVENTUALLY([&] () {
+ NO_FATALS(GenerateIncrementalTabletReport(&report));
+ ASSERT_TRUE(report.is_incremental());
+ MarkTabletReportAcknowledged(report);
+ ASSERT_EQ(0, report.updated_tablets().size());
+ });
+ ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
+
+ // 4. Write some test rows to 'tablet-1'.
+ NO_FATALS(InsertTestRows(replica1->tablet(), kCount));
+
+ // Trigger updates to tablet statistics as soon as possible again.
+ tablet_manager_->SetNextUpdateTimeForTests();
+ heartbeater_->TriggerASAP();
+
+ // Do an incremental report - should include the tablet and tablet statistics.
+ ASSERT_EVENTUALLY([&] () {
+ NO_FATALS(GenerateIncrementalTabletReport(&report));
+ ASSERT_TRUE(report.is_incremental());
+ ASSERT_EQ(1, report.updated_tablets().size());
+ });
+ ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
+ ASSERT_GT(report.updated_tablets(0).stats().on_disk_size(), 0);
+ ASSERT_EQ(kCount, report.updated_tablets(0).stats().live_row_count());
+ ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");
+ MarkTabletReportAcknowledged(report);
+}
+
} // namespace tserver
} // namespace kudu
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index d4a404c..5f5acb8 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -65,6 +65,7 @@
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -132,6 +133,11 @@ DEFINE_int32(delete_tablet_inject_latency_ms, 0,
"Amount of delay in milliseconds to inject into delete tablet operations.");
TAG_FLAG(delete_tablet_inject_latency_ms, unsafe);
+DEFINE_int32(update_tablet_stats_interval_ms, 5000,
+ "Interval at which the tablet statistics should be updated."
+ "Should be greater than 'heartbeat_interval_ms'");
+TAG_FLAG(update_tablet_stats_interval_ms, advanced);
+
DECLARE_bool(raft_prepare_replacement_before_eviction);
METRIC_DEFINE_gauge_int32(server, tablets_num_not_initialized,
@@ -174,6 +180,8 @@ METRIC_DEFINE_gauge_int32(server, tablets_num_shutdown,
kudu::MetricUnit::kTablets,
"Number of tablets currently shut down");
+DECLARE_int32(heartbeat_interval_ms);
+
using std::set;
using std::shared_ptr;
using std::string;
@@ -199,6 +207,7 @@ using fs::DataDirManager;
using log::Log;
using master::ReportedTabletPB;
using master::TabletReportPB;
+using tablet::ReportedTabletStatsPB;
using tablet::Tablet;
using tablet::TABLET_DATA_COPYING;
using tablet::TABLET_DATA_DELETED;
@@ -211,6 +220,20 @@ using tserver::TabletCopyClient;
namespace tserver {
+namespace {
+bool ValidateUpdateTabletStatsInterval() {
+ if (FLAGS_update_tablet_stats_interval_ms < FLAGS_heartbeat_interval_ms) {
+ LOG(ERROR) << "Tablet stats updating interval (--update_tablet_stats_interval_ms)"
+ << "should be greater than heartbeat interval (--heartbeat_interval_ms)";
+ return false;
+ }
+
+ return true;
+}
+} // anonymous namespace
+
+GROUP_FLAG_VALIDATOR(update_tablet_stats_interval_ms, ValidateUpdateTabletStatsInterval);
+
TSTabletManager::TSTabletManager(TabletServer* server)
: fs_manager_(server->fs_manager()),
cmeta_manager_(new ConsensusMetadataManager(fs_manager_)),
@@ -218,6 +241,9 @@ TSTabletManager::TSTabletManager(TabletServer* server)
metric_registry_(server->metric_registry()),
tablet_copy_metrics_(server->metric_entity()),
state_(MANAGER_INITIALIZING) {
+ next_update_time_ = MonoTime::Now() +
+ MonoDelta::FromMilliseconds(FLAGS_update_tablet_stats_interval_ms);
+
METRIC_tablets_num_not_initialized.InstantiateFunctionGauge(
server->metric_entity(),
Bind(&TSTabletManager::RefreshTabletStateCacheAndReturnCount,
@@ -1222,11 +1248,15 @@ void TSTabletManager::GetTabletReplicas(vector<scoped_refptr<TabletReplica> >* r
AppendValuesFromMap(tablet_map_, replicas);
}
-void TSTabletManager::MarkTabletDirty(const std::string& tablet_id, const std::string& reason) {
+void TSTabletManager::MarkTabletDirty(const string& tablet_id, const string& reason) {
VLOG(2) << Substitute("$0 Marking dirty. Reason: $1. Will report this "
"tablet to the Master in the next heartbeat",
LogPrefix(tablet_id), reason);
- server_->heartbeater()->MarkTabletDirty(tablet_id, reason);
+ MarkTabletsDirty({ tablet_id }, reason);
+}
+
+void TSTabletManager::MarkTabletsDirty(const vector<string>& tablet_ids, const string& reason) {
+ server_->heartbeater()->MarkTabletsDirty(tablet_ids, reason);
server_->heartbeater()->TriggerASAP();
}
@@ -1290,6 +1320,16 @@ void TSTabletManager::CreateReportedTabletPB(const scoped_refptr<TabletReplica>&
*reported_tablet->mutable_consensus_state() = std::move(cstate);
}
}
+
+ // First, the replica's state should be RUNNING already.
+ // Then fill in the replica's stats only when it's LEADER.
+ if (tablet::RUNNING == replica->state() &&
+ consensus::RaftPeerPB_Role_LEADER == consensus->role()) {
+ ReportedTabletStatsPB stats_pb = replica->GetTabletStats();
+ if (stats_pb.IsInitialized()) {
+ *reported_tablet->mutable_stats() = std::move(stats_pb);
+ }
+ }
}
void TSTabletManager::PopulateFullTabletReport(TabletReportPB* report) const {
@@ -1540,6 +1580,36 @@ Status TSTabletManager::WaitForNoTransitionsForTests(const MonoDelta& timeout) c
timeout.ToString());
}
+void TSTabletManager::UpdateTabletStatsIfNecessary() {
+ // Only one thread is allowed to update at the same time.
+ std::unique_lock<rw_spinlock> try_lock(lock_update_, std::try_to_lock);
+ if (!try_lock.owns_lock()) {
+ return;
+ }
+
+ if (MonoTime::Now() < next_update_time_) {
+ return;
+ }
+ next_update_time_ = MonoTime::Now() +
+ MonoDelta::FromMilliseconds(FLAGS_update_tablet_stats_interval_ms);
+ try_lock.unlock();
+
+ // Update the tablet stats and collect the dirty tablets.
+ vector<string> dirty_tablets;
+ vector<scoped_refptr<TabletReplica>> replicas;
+ GetTabletReplicas(&replicas);
+ for (const auto& replica : replicas) {
+ replica->UpdateTabletStats(&dirty_tablets);
+ }
+
+ MarkTabletsDirty(dirty_tablets, "The tablet statistics have been changed");
+}
+
+void TSTabletManager::SetNextUpdateTimeForTests() {
+ std::lock_guard<rw_spinlock> l(lock_update_);
+ next_update_time_ = MonoTime::Now();
+}
+
TransitionInProgressDeleter::TransitionInProgressDeleter(
TransitionInProgressMap* map, RWMutex* lock, string entry)
: in_progress_(map), lock_(lock), entry_(std::move(entry)) {}
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index d33f3b3..e303ad7 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -198,6 +198,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// TsTabletManager, such as consensus role changes.
void MarkTabletDirty(const std::string& tablet_id, const std::string& reason);
+ // Marks tablets as dirty in batch.
+ void MarkTabletsDirty(const std::vector<std::string>& tablet_ids, const std::string& reason);
+
// Return the number of tablets in RUNNING or BOOTSTRAPPING state.
int GetNumLiveTablets() const;
@@ -235,8 +238,13 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// This method is for use in tests only. See KUDU-2444.
Status WaitForNoTransitionsForTests(const MonoDelta& timeout) const;
+ // Update the tablet statistics if necessary.
+ void UpdateTabletStatsIfNecessary();
+
private:
FRIEND_TEST(TsTabletManagerTest, TestPersistBlocks);
+ FRIEND_TEST(TsTabletManagerTest, TestTabletStatsReports);
+ FRIEND_TEST(TsTabletManagerITest, TestTableStats);
// Flag specified when registering a TabletReplica.
enum RegisterTabletReplicaMode {
@@ -340,6 +348,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// running state.
void InitLocalRaftPeerPB();
+ // Just for tests.
+ void SetNextUpdateTimeForTests();
+
FsManager* const fs_manager_;
const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
@@ -391,6 +402,10 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
FunctionGaugeDetacher metric_detacher_;
+ // Ensures that we only update stats from a single thread at a time.
+ mutable rw_spinlock lock_update_;
+ MonoTime next_update_time_;
+
DISALLOW_COPY_AND_ASSIGN(TSTabletManager);
};
diff --git a/src/kudu/util/metrics.cc b/src/kudu/util/metrics.cc
index f666784..1741c11 100644
--- a/src/kudu/util/metrics.cc
+++ b/src/kudu/util/metrics.cc
@@ -156,7 +156,6 @@ scoped_refptr<MetricEntity> MetricEntityPrototype::Instantiate(
return registry->FindOrCreateEntity(this, id, initial_attrs);
}
-
//
// MetricEntity
//
diff --git a/www/table.mustache b/www/table.mustache
index cd12504..c940513 100644
--- a/www/table.mustache
+++ b/www/table.mustache
@@ -32,6 +32,8 @@ under the License.
<tr><td>Version:</td><td>{{version}}</td></tr>
<tr><td>State:</td><td>{{state}} {{#state_msg}}({{.}}){{/state_msg}}</td></tr>
<tr><td>Column Count:</td><td>{{column_count}}</td></tr>
+ <tr><td>Live Row Count:</td><td>{{table_live_row_count}}</td></tr>
+ <tr><td>On-Disk Size (leaders only):</td><td>{{table_disk_size}}</td></tr>
</tbody>
</table>