You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/02/07 07:52:49 UTC
[kudu] branch master updated: [www] add slow scans section
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 3d496deec [www] add slow scans section
3d496deec is described below
commit 3d496deec61b5d44ef133b553e282f5014a8b36f
Author: kedeng <kd...@gmail.com>
AuthorDate: Wed Dec 28 15:25:39 2022 +0800
[www] add slow scans section
We can get the history of completed scans from /scans page right now.
But it is not easy to distinguish slow ones among those.
I introduced an extra section to the scan page to show slow scans
separately. A scan is called 'slow' if it takes more time than defined
by --slow_scanner_threshold_ms.
The number of elements in the slow scans history is limited by
--slow_scan_history_count.
Change-Id: Ibd1dcac8b81c5eefd306e7020c9c52b3f28e603c
Reviewed-on: http://gerrit.cloudera.org:8080/19392
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <al...@apache.org>
---
src/kudu/client/scan_token-test.cc | 73 ++++++++++-
src/kudu/tserver/scanners.cc | 211 ++++++++++++++++++++++--------
src/kudu/tserver/scanners.h | 50 +++++--
src/kudu/tserver/tablet_server-test.cc | 8 +-
src/kudu/tserver/tablet_server.cc | 6 +-
src/kudu/tserver/tserver_path_handlers.cc | 64 +++++----
www/scans.mustache | 69 +++++++++-
7 files changed, 379 insertions(+), 102 deletions(-)
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index 1cf0ccde0..d46bfc12b 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -73,6 +73,8 @@
#include "kudu/util/test_util.h"
DECLARE_bool(tserver_enforce_access_control);
+DECLARE_int32(scanner_inject_latency_on_each_batch_ms);
+DECLARE_int32(slow_scanner_threshold_ms);
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema);
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
@@ -85,6 +87,7 @@ using kudu::master::CatalogManager;
using kudu::master::TabletInfo;
using kudu::tablet::TabletReplica;
using kudu::tserver::MiniTabletServer;
+using kudu::tserver::SharedScanDescriptor;
using std::atomic;
using std::map;
using std::string;
@@ -328,7 +331,7 @@ class ScanTokenTest : public KuduTest {
void GetScannerCount(map<string, int32_t>* scanner_count_by_ts_uuid) {
scanner_count_by_ts_uuid->clear();
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- vector<tserver::ScanDescriptor> scanners =
+ vector<SharedScanDescriptor> scanners =
cluster_->mini_tablet_server(i)->server()->scanner_manager()->ListScans();
scanner_count_by_ts_uuid->insert(
{cluster_->mini_tablet_server(i)->server()->instance_pb().permanent_uuid(),
@@ -336,6 +339,20 @@ class ScanTokenTest : public KuduTest {
}
}
+ uint32_t GetSlowScansCount() {
+ // Just use for the cluster with only 1 tserver.
+ vector<SharedScanDescriptor> scans =
+ cluster_->mini_tablet_server(0)->server()->scanner_manager()->ListSlowScans();
+ return scans.size();
+ }
+
+ uint32_t GetCompletedScansCount() {
+ // Just use for the cluster with only 1 tserver.
+ vector<SharedScanDescriptor> scans =
+ cluster_->mini_tablet_server(0)->server()->scanner_manager()->ListScans();
+ return scans.size();
+ }
+
KuduSchema GetTokenProjectionSchema(const KuduScanToken& token) {
string serialized_token;
CHECK_OK(token.Serialize(&serialized_token));
@@ -349,6 +366,60 @@ class ScanTokenTest : public KuduTest {
unique_ptr<InternalMiniCluster> cluster_;
};
+TEST_F(ScanTokenTest, SlowScansListTest) {
+ constexpr const char* const kTableName = "slow_scans_show";
+ // Create schema
+ KuduSchema schema;
+ {
+ KuduSchemaBuilder builder;
+ builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+ ASSERT_OK(builder.Build(&schema));
+ }
+
+ // Create table
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(CreateAndOpenTable(kTableName, schema, &table));
+
+ // Only 1 tserver is OK.
+ ASSERT_EQ(1, cluster_->num_tablet_servers());
+
+ // Create session
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(10000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+ // Insert rows
+ for (int i = 0; i < 200; i++) {
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ ASSERT_OK(insert->mutable_row()->SetInt64("key", i));
+ ASSERT_OK(session->Apply(insert.release()));
+ }
+ ASSERT_OK(session->Flush());
+
+ {
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+
+ ASSERT_EQ(200, CountRows(tokens));
+ ASSERT_EQ(1, GetCompletedScansCount());
+ ASSERT_EQ(0, GetSlowScansCount());
+ }
+
+ {
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+
+ // Create a slow scan scenarios.
+ FLAGS_scanner_inject_latency_on_each_batch_ms = 50;
+ FLAGS_slow_scanner_threshold_ms = 40;
+ ASSERT_EQ(200, CountRows(tokens));
+ ASSERT_EQ(2, GetCompletedScansCount());
+ ASSERT_EQ(1, GetSlowScansCount());
+ }
+}
+
TEST_F(ScanTokenTest, TestScanTokens) {
// Create schema
KuduSchema schema;
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 30bf4f7be..d68aa161b 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -56,14 +56,27 @@ DEFINE_int32(scanner_ttl_ms, 60000,
TAG_FLAG(scanner_ttl_ms, advanced);
TAG_FLAG(scanner_ttl_ms, runtime);
-DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L *1000L, // 5 seconds
+DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L * 1000L, // 5 seconds
"Number of microseconds in the interval at which we remove expired scanners");
TAG_FLAG(scanner_gc_check_interval_us, hidden);
-DEFINE_int32(scan_history_count, 20,
- "Number of completed scans to keep history for. Determines how many historical "
- "scans will be shown on the tablet server's scans dashboard.");
-TAG_FLAG(scan_history_count, experimental);
+DEFINE_int32(completed_scan_history_count, 10,
+ "Number of latest scans to keep history for. Determines how many historical "
+ "latest scans will be shown on the tablet server's scans dashboard.");
+TAG_FLAG(completed_scan_history_count, experimental);
+
+// TODO(kedeng) : Add flag to control the display of slow scans, and avoid full scanning
+// affecting normal Kudu service without perception.
+DEFINE_int32(slow_scanner_threshold_ms, 60 * 1000L, // 1 minute
+ "Number of milliseconds for the threshold of slow scan.");
+TAG_FLAG(slow_scanner_threshold_ms, advanced);
+TAG_FLAG(slow_scanner_threshold_ms, runtime);
+
+DEFINE_int32(slow_scan_history_count, 10,
+ "Number of slow scans to keep history for. Determines how many historical "
+ "slow scans will be shown on the tablet server's scans dashboard. The "
+ "threshold for a slow scan is defined with --slow_scanner_threshold_ms.");
+TAG_FLAG(slow_scan_history_count, experimental);
DECLARE_int32(rpc_default_keepalive_time_ms);
@@ -108,7 +121,8 @@ namespace tserver {
ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity)
: shutdown_(false),
shutdown_cv_(&shutdown_lock_),
- completed_scans_offset_(0) {
+ completed_scans_offset_(0),
+ slow_scans_offset_(0) {
if (metric_entity) {
metrics_.reset(new ScannerMetrics(metric_entity));
METRIC_active_scanners.InstantiateFunctionGauge(
@@ -119,8 +133,12 @@ ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity)
scanner_maps_.push_back(new ScannerMapStripe());
}
- if (FLAGS_scan_history_count > 0) {
- completed_scans_.reserve(FLAGS_scan_history_count);
+ if (FLAGS_completed_scan_history_count > 0) {
+ completed_scans_.reserve(FLAGS_completed_scan_history_count);
+ }
+
+ if (FLAGS_slow_scan_history_count > 0) {
+ slow_scans_.reserve(FLAGS_slow_scan_history_count);
}
}
@@ -136,14 +154,14 @@ ScannerManager::~ScannerManager() {
STLDeleteElements(&scanner_maps_);
}
-Status ScannerManager::StartRemovalThread() {
- RETURN_NOT_OK(Thread::Create("scanners", "removal_thread",
- [this]() { this->RunRemovalThread(); },
+Status ScannerManager::StartCollectAndRemovalThread() {
+ RETURN_NOT_OK(Thread::Create("scanners", "collect_and_removal_thread",
+ [this]() { this->RunCollectAndRemovalThread(); },
&removal_thread_));
return Status::OK();
}
-void ScannerManager::RunRemovalThread() {
+void ScannerManager::RunCollectAndRemovalThread() {
while (true) {
// Loop until we are shutdown.
{
@@ -153,6 +171,7 @@ void ScannerManager::RunRemovalThread() {
}
shutdown_cv_.WaitFor(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us));
}
+ CollectSlowScanners();
RemoveExpiredScanners();
}
}
@@ -205,7 +224,7 @@ Status ScannerManager::LookupScanner(const string& scanner_id,
}
bool ScannerManager::UnregisterScanner(const string& scanner_id) {
- ScanDescriptor descriptor;
+ SharedScanDescriptor descriptor;
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
{
std::lock_guard<RWMutex> l(stripe.lock_);
@@ -217,7 +236,7 @@ bool ScannerManager::UnregisterScanner(const string& scanner_id) {
bool is_initted = it->second->is_initted();
if (is_initted) {
descriptor = it->second->Descriptor();
- descriptor.state = it->second->iter()->HasNext() ? ScanState::kFailed : ScanState::kComplete;
+ descriptor->state = it->second->iter()->HasNext() ? ScanState::kFailed : ScanState::kComplete;
}
stripe.scanners_by_id_.erase(it);
if (!is_initted) {
@@ -225,8 +244,19 @@ bool ScannerManager::UnregisterScanner(const string& scanner_id) {
}
}
- std::lock_guard<RWMutex> l(completed_scans_lock_);
- RecordCompletedScanUnlocked(std::move(descriptor));
+ {
+ std::lock_guard<percpu_rwlock> l(completed_scans_lock_);
+ RecordCompletedScanUnlocked(descriptor);
+ }
+
+ {
+ const MonoTime start_time = descriptor->start_time;
+ if (start_time + MonoDelta::FromMilliseconds(FLAGS_slow_scanner_threshold_ms)
+ < MonoTime::Now()) {
+ std::lock_guard<percpu_rwlock> l(slow_scans_lock_);
+ RecordSlowScanUnlocked(descriptor);
+ }
+ }
return true;
}
@@ -248,46 +278,110 @@ void ScannerManager::ListScanners(std::vector<SharedScanner>* scanners) const {
}
}
-vector<ScanDescriptor> ScannerManager::ListScans() const {
- unordered_map<string, ScanDescriptor> scans;
+vector<SharedScanDescriptor> ScannerManager::ListScans() const {
+ unordered_map<string, SharedScanDescriptor> scans;
for (const ScannerMapStripe* stripe : scanner_maps_) {
shared_lock<RWMutex> l(stripe->lock_);
for (const auto& se : stripe->scanners_by_id_) {
if (se.second->is_initted()) {
- ScanDescriptor desc = se.second->Descriptor();
- desc.state = ScanState::kActive;
+ SharedScanDescriptor desc = se.second->Descriptor();
+ desc->state = ScanState::kActive;
EmplaceOrDie(&scans, se.first, std::move(desc));
}
}
}
{
- shared_lock<RWMutex> l(completed_scans_lock_);
+ kudu::shared_lock<rw_spinlock> l(completed_scans_lock_.get_lock());
// A scanner in 'scans' may have completed between the above loop and here.
// As we'd rather have the finalized descriptor of the completed scan,
// update over the old descriptor in this case.
for (const auto& scan : completed_scans_) {
- InsertOrUpdate(&scans, scan.scanner_id, scan);
+ InsertOrUpdate(&scans, scan->scanner_id, scan);
}
}
- vector<ScanDescriptor> ret;
+ vector<SharedScanDescriptor> ret;
ret.reserve(scans.size());
AppendValuesFromMap(scans, &ret);
// Sort oldest to newest, so that the ordering is consistent across calls.
- std::sort(ret.begin(), ret.end(), [] (const ScanDescriptor& a, const ScanDescriptor& b) {
- return a.start_time > b.start_time;
+ std::sort(ret.begin(), ret.end(), [] (const SharedScanDescriptor& a,
+ const SharedScanDescriptor& b) {
+ return a->start_time > b->start_time;
});
return ret;
}
+vector<SharedScanDescriptor> ScannerManager::ListSlowScans() const {
+ // Get all the scans first.
+ unordered_map<string, SharedScanDescriptor> scans;
+ {
+ kudu::shared_lock<rw_spinlock> l(slow_scans_lock_.get_lock());
+ for (const auto& scan : slow_scans_) {
+ InsertOrUpdate(&scans, scan->scanner_id, scan);
+ }
+ }
+
+ vector<SharedScanDescriptor> ret;
+ ret.reserve(scans.size());
+ AppendValuesFromMap(scans, &ret);
+
+ // Sort oldest to newest, so that the ordering is consistent across calls.
+ std::sort(ret.begin(), ret.end(), [] (const SharedScanDescriptor& a,
+ const SharedScanDescriptor& b) {
+ return a->start_time > b->start_time;
+ });
+
+ return ret;
+}
+
+void ScannerManager::CollectSlowScanners() {
+ const MonoTime now = MonoTime::Now();
+
+ vector<SharedScanDescriptor> descriptors;
+ int32_t slow_scanner_threshold = FLAGS_slow_scanner_threshold_ms;
+ for (ScannerMapStripe* stripe : scanner_maps_) {
+ std::lock_guard<RWMutex> l(stripe->lock_);
+ for (auto it = stripe->scanners_by_id_.begin(); it != stripe->scanners_by_id_.end(); ++it) {
+ const SharedScanner& scanner = it->second;
+ if (!scanner->is_initted()) {
+ // Ignore uninitialized scans.
+ continue;
+ }
+ const MonoTime start_time = scanner->start_time();
+ if (start_time + MonoDelta::FromMilliseconds(slow_scanner_threshold) >= now) {
+ continue;
+ }
+
+ MonoDelta delta_time = now - start_time -
+ MonoDelta::FromMilliseconds(slow_scanner_threshold);
+ // TODO(kedeng) : Add flag to control whether to print this log.
+ LOG(INFO) << Substitute(
+ "Slow scanner id: $0, of tablet $1, "
+ "exceed the time threshold $2 ms for $3 ms.",
+ it->first,
+ scanner->tablet_id(),
+ slow_scanner_threshold,
+ delta_time.ToMilliseconds());
+ descriptors.emplace_back(scanner->Descriptor());
+ }
+ }
+
+ std::lock_guard<percpu_rwlock> l(slow_scans_lock_);
+ for (auto& descriptor : descriptors) {
+ if (std::find(slow_scans_.begin(), slow_scans_.end(), descriptor) == slow_scans_.end()) {
+ RecordSlowScanUnlocked(descriptor);
+ }
+ }
+}
+
void ScannerManager::RemoveExpiredScanners() {
MonoDelta scanner_ttl = MonoDelta::FromMilliseconds(FLAGS_scanner_ttl_ms);
const MonoTime now = MonoTime::Now();
- vector<ScanDescriptor> descriptors;
+ vector<SharedScanDescriptor> descriptors;
for (ScannerMapStripe* stripe : scanner_maps_) {
std::lock_guard<RWMutex> l(stripe->lock_);
for (auto it = stripe->scanners_by_id_.begin(); it != stripe->scanners_by_id_.end();) {
@@ -316,28 +410,39 @@ void ScannerManager::RemoveExpiredScanners() {
}
}
- std::lock_guard<RWMutex> l(completed_scans_lock_);
+ std::lock_guard<percpu_rwlock> l(completed_scans_lock_);
for (auto& descriptor : descriptors) {
- descriptor.last_access_time = now;
- descriptor.state = ScanState::kExpired;
- RecordCompletedScanUnlocked(std::move(descriptor));
+ descriptor->last_access_time = now;
+ descriptor->state = ScanState::kExpired;
+ RecordCompletedScanUnlocked(descriptor);
}
}
-void ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) {
- if (completed_scans_.capacity() == 0) {
+void ScannerManager::CircularUpdateRecordInFifo(vector<SharedScanDescriptor>& scans_vec,
+ size_t& scans_offset,
+ const SharedScanDescriptor& descriptor) {
+ if (scans_vec.capacity() == 0) {
return;
}
- if (completed_scans_.size() == completed_scans_.capacity()) {
- completed_scans_[completed_scans_offset_++] = std::move(descriptor);
- if (completed_scans_offset_ == completed_scans_.capacity()) {
- completed_scans_offset_ = 0;
+
+ if (scans_vec.size() == scans_vec.capacity()) {
+ scans_vec[scans_offset++] = descriptor;
+ if (scans_offset == scans_vec.capacity()) {
+ scans_offset = 0;
}
} else {
- completed_scans_.emplace_back(std::move(descriptor));
+ scans_vec.emplace_back(descriptor);
}
}
+void ScannerManager::RecordCompletedScanUnlocked(const SharedScanDescriptor& descriptor) {
+ return CircularUpdateRecordInFifo(completed_scans_, completed_scans_offset_, descriptor);
+}
+
+void ScannerManager::RecordSlowScanUnlocked(const SharedScanDescriptor& descriptor) {
+ return CircularUpdateRecordInFifo(slow_scans_, slow_scans_offset_, descriptor);
+}
+
const std::string Scanner::kNullTabletId = "null tablet";
Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
@@ -407,39 +512,39 @@ IteratorStats Scanner::UpdateStatsAndGetDelta() {
return delta_stats;
}
-ScanDescriptor Scanner::Descriptor() const {
+SharedScanDescriptor Scanner::Descriptor() const {
// Ignore non-initialized scans. The initializing state is transient, and
// handling it correctly is complicated. Since the scanner is initialized we
// can assume iter_, spec_, and client_projection_schema_ are valid
// pointers.
CHECK(is_initted());
- ScanDescriptor descriptor;
- descriptor.tablet_id = tablet_id();
- descriptor.scanner_id = id();
- descriptor.remote_user = remote_user();
- descriptor.start_time = start_time_;
+ SharedScanDescriptor descriptor(new ScanDescriptor);
+ descriptor->tablet_id = tablet_id();
+ descriptor->scanner_id = id();
+ descriptor->remote_user = remote_user();
+ descriptor->start_time = start_time_;
for (const auto& column : client_projection_schema_->columns()) {
- descriptor.projected_columns.emplace_back(column.name());
+ descriptor->projected_columns.emplace_back(column.name());
}
const auto& tablet_metadata = tablet_replica_->tablet_metadata();
- descriptor.table_name = tablet_metadata->table_name();
+ descriptor->table_name = tablet_metadata->table_name();
SchemaPtr schema_ptr = tablet_metadata->schema();
if (spec().lower_bound_key()) {
- descriptor.predicates.emplace_back(
+ descriptor->predicates.emplace_back(
Substitute("PRIMARY KEY >= $0", KUDU_REDACT(
spec().lower_bound_key()->Stringify(*schema_ptr))));
}
if (spec().exclusive_upper_bound_key()) {
- descriptor.predicates.emplace_back(
+ descriptor->predicates.emplace_back(
Substitute("PRIMARY KEY < $0", KUDU_REDACT(
spec().exclusive_upper_bound_key()->Stringify(*schema_ptr))));
}
for (const auto& predicate : spec().predicates()) {
- descriptor.predicates.emplace_back(predicate.second.ToString());
+ descriptor->predicates.emplace_back(predicate.second.ToString());
}
vector<IteratorStats> iterator_stats;
@@ -447,13 +552,13 @@ ScanDescriptor Scanner::Descriptor() const {
DCHECK_EQ(iterator_stats.size(), iter_->schema().num_columns());
for (int col_idx = 0; col_idx < iterator_stats.size(); col_idx++) {
- descriptor.iterator_stats.emplace_back(iter_->schema().column(col_idx).name(),
- iterator_stats[col_idx]);
+ descriptor->iterator_stats.emplace_back(iter_->schema().column(col_idx).name(),
+ iterator_stats[col_idx]);
}
- descriptor.last_call_seq_id = ANNOTATE_UNPROTECTED_READ(call_seq_id_);
- descriptor.last_access_time = last_access_time_.load(std::memory_order_relaxed);
- descriptor.cpu_times = cpu_times();
+ descriptor->last_call_seq_id = ANNOTATE_UNPROTECTED_READ(call_seq_id_);
+ descriptor->last_access_time = last_access_time_.load(std::memory_order_relaxed);
+ descriptor->cpu_times = cpu_times();
return descriptor;
}
diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h
index 5fa2c1a15..531a33f68 100644
--- a/src/kudu/tserver/scanners.h
+++ b/src/kudu/tserver/scanners.h
@@ -38,6 +38,7 @@
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/condition_variable.h"
+#include "kudu/util/locks.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -62,6 +63,7 @@ struct ScanDescriptor;
struct ScannerMetrics;
typedef std::shared_ptr<Scanner> SharedScanner;
+typedef scoped_refptr<ScanDescriptor> SharedScanDescriptor;
// Manages the live scanners within a Tablet Server.
//
@@ -76,8 +78,8 @@ class ScannerManager {
explicit ScannerManager(const scoped_refptr<MetricEntity>& metric_entity);
~ScannerManager();
- // Starts the expired scanner removal thread.
- Status StartRemovalThread();
+ // Starts the slow scans collect and the expired scanner removal thread.
+ Status StartCollectAndRemovalThread();
// Create a new scanner with a unique ID, inserting it into the map. Further
// lookups for the scanner must provide the username associated with
@@ -111,11 +113,21 @@ class ScannerManager {
void ListScanners(std::vector<SharedScanner>* scanners) const;
// List active and recently completed scans.
- std::vector<ScanDescriptor> ListScans() const;
+ std::vector<SharedScanDescriptor> ListScans() const;
+
+ // List recent slow scans.
+ // A scan is 'slow' if it takes more than --slow_scanner_threshold_ms to
+ // complete.
+ // The number of elements in the result vector is limited by
+ // --slow_scan_history_count.
+ std::vector<SharedScanDescriptor> ListSlowScans() const;
// Iterate through scanners and remove any which are past their TTL.
void RemoveExpiredScanners();
+ // Collect slow scanners whose scan times exceed the threshold.
+ void CollectSlowScanners();
+
private:
FRIEND_TEST(ScannerTest, TestExpire);
@@ -132,13 +144,23 @@ class ScannerManager {
ScannerMap scanners_by_id_;
};
- // Periodically call RemoveExpiredScanners().
- void RunRemovalThread();
+ // Periodically call CollectSlowScanners() and RemoveExpiredScanners().
+ void RunCollectAndRemovalThread();
ScannerMapStripe& GetStripeByScannerId(const std::string& scanner_id);
// Adds the scan descriptor to the completed scans FIFO.
- void RecordCompletedScanUnlocked(ScanDescriptor descriptor);
+ void RecordCompletedScanUnlocked(const SharedScanDescriptor& descriptor);
+
+ // Adds the scan descriptor to the slow scans FIFO.
+ void RecordSlowScanUnlocked(const SharedScanDescriptor& descriptor);
+
+ // Update the record in the vector with the fifo method.
+ // If the container is full when updating, update the data inserted first.
+ // The number of elements in the vector is init at the vector initialization.
+ static void CircularUpdateRecordInFifo(std::vector<SharedScanDescriptor>& scans_vec,
+ size_t& scans_offset,
+ const SharedScanDescriptor& descriptor);
// (Optional) scanner metrics for this instance.
std::unique_ptr<ScannerMetrics> metrics_;
@@ -152,10 +174,15 @@ class ScannerManager {
std::vector<ScannerMapStripe*> scanner_maps_;
// completed_scans_ is a FIFO ring buffer of completed scans.
- mutable RWMutex completed_scans_lock_;
- std::vector<ScanDescriptor> completed_scans_;
+ mutable percpu_rwlock completed_scans_lock_;
+ std::vector<SharedScanDescriptor> completed_scans_;
size_t completed_scans_offset_;
+ // slow_scans_ is a FIFO ring buffer of slow scans.
+ mutable percpu_rwlock slow_scans_lock_;
+ std::vector<SharedScanDescriptor> slow_scans_;
+ size_t slow_scans_offset_;
+
// Generator for scanner IDs.
ObjectIdGenerator oid_generator_;
@@ -358,7 +385,7 @@ class Scanner {
// Does not require the AccessLock.
//
// REQUIRES: is_initted() must be true.
- ScanDescriptor Descriptor() const;
+ SharedScanDescriptor Descriptor() const;
// Returns the amount of CPU time accounted to this scanner.
// Does not require the AccessLock.
@@ -456,7 +483,10 @@ enum class ScanState {
// ScanDescriptor holds information about a scan. The ScanDescriptor can outlive
// the associated scanner without holding open any of the scanner's resources.
-struct ScanDescriptor {
+//
+// These are ref-counted so that ScanDescriptor is copyable, with this we can avoid
+// duplicating data for elements in slow_scans_ and completed_scans_ in ScannerManager.
+struct ScanDescriptor : public RefCountedThreadSafe<ScanDescriptor> {
// The tablet ID.
std::string tablet_id;
// The scanner ID.
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 9d2787505..7ddbb3836 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -3195,10 +3195,10 @@ TEST_F(TabletServerTest, TestScanWithSimplifiablePredicates) {
auto scan_descriptors = mini_server_->server()->scanner_manager()->ListScans();
ASSERT_EQ(1, projection.columns().size());
ASSERT_EQ(1, scan_descriptors.size());
- ASSERT_EQ(projection.columns().size(), scan_descriptors[0].projected_columns.size());
- ASSERT_EQ(2, scan_descriptors[0].predicates.size());
- ASSERT_EQ(projection.columns().size(), scan_descriptors[0].iterator_stats.size());
- ASSERT_EQ(projection.column(0).name(), scan_descriptors[0].iterator_stats[0].first);
+ ASSERT_EQ(projection.columns().size(), scan_descriptors[0]->projected_columns.size());
+ ASSERT_EQ(2, scan_descriptors[0]->predicates.size());
+ ASSERT_EQ(projection.columns().size(), scan_descriptors[0]->iterator_stats.size());
+ ASSERT_EQ(projection.column(0).name(), scan_descriptors[0]->iterator_stats[0].first);
}
// Drain all the rows from the scanner.
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 43f60140a..694a4710f 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -20,6 +20,8 @@
#include <functional>
#include <memory>
#include <ostream>
+#include <type_traits>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -117,8 +119,8 @@ Status TabletServer::Init() {
RETURN_NOT_OK_PREPEND(tablet_manager_->Init(start_tablets, tablets_processed,
total_tablets),
"Could not init Tablet Manager");
- RETURN_NOT_OK_PREPEND(scanner_manager_->StartRemovalThread(),
- "Could not start expired Scanner removal thread");
+ RETURN_NOT_OK_PREPEND(scanner_manager_->StartCollectAndRemovalThread(),
+ "Could not start slow scans collect and expired Scanner removal thread");
state_ = kInitialized;
return Status::OK();
diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index abfaef3f6..37e925650 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -96,8 +96,8 @@ using std::string;
using std::vector;
using strings::Substitute;
-DECLARE_int32(scan_history_count);
-
+DECLARE_int32(completed_scan_history_count);
+DECLARE_int32(slow_scan_history_count);
namespace kudu {
namespace tserver {
@@ -504,29 +504,29 @@ const char* ScanStateToString(const ScanState& scan_state) {
}
// Formats the scan descriptor's pseudo-SQL query string as HTML.
-string ScanQueryHtml(const ScanDescriptor& scan) {
+string ScanQueryHtml(const SharedScanDescriptor& scan) {
string query = "<b>SELECT</b> ";
- if (scan.projected_columns.empty()) {
+ if (scan->projected_columns.empty()) {
query.append("COUNT(*)");
} else {
- query.append(JoinMapped(scan.projected_columns, EscapeForHtmlToString, ",<br> "));
+ query.append(JoinMapped(scan->projected_columns, EscapeForHtmlToString, ",<br> "));
}
query.append("<br> <b>FROM</b> ");
- if (scan.table_name.empty()) {
+ if (scan->table_name.empty()) {
query.append("<unknown>");
} else {
- query.append(EscapeForHtmlToString(scan.table_name));
+ query.append(EscapeForHtmlToString(scan->table_name));
}
- if (!scan.predicates.empty()) {
+ if (!scan->predicates.empty()) {
query.append("<br> <b>WHERE</b> ");
- query.append(JoinMapped(scan.predicates, EscapeForHtmlToString, "<br> <b>AND</b> "));
+ query.append(JoinMapped(scan->predicates, EscapeForHtmlToString, "<br> <b>AND</b> "));
}
return query;
}
-void IteratorStatsToJson(const ScanDescriptor& scan, EasyJson* json) {
+void IteratorStatsToJson(const SharedScanDescriptor& scan, EasyJson* json) {
auto fill_stats = [] (EasyJson& row, const string& column, const IteratorStats& stats) {
row["column"] = column;
@@ -541,7 +541,7 @@ void IteratorStatsToJson(const ScanDescriptor& scan, EasyJson* json) {
};
IteratorStats total_stats;
- for (const auto& column : scan.iterator_stats) {
+ for (const auto& column : scan->iterator_stats) {
EasyJson row = json->PushBack(EasyJson::kObject);
fill_stats(row, column.first, column.second);
total_stats += column.second;
@@ -551,28 +551,28 @@ void IteratorStatsToJson(const ScanDescriptor& scan, EasyJson* json) {
fill_stats(total_row, "total", total_stats);
}
-void ScanToJson(const ScanDescriptor& scan, EasyJson* json) {
+void ScanToJson(const SharedScanDescriptor& scan, EasyJson* json) {
MonoTime now = MonoTime::Now();
MonoDelta duration;
- if (scan.state == ScanState::kActive) {
- duration = now - scan.start_time;
+ if (scan->state == ScanState::kActive) {
+ duration = now - scan->start_time;
} else {
- duration = scan.last_access_time - scan.start_time;
+ duration = scan->last_access_time - scan->start_time;
}
- MonoDelta time_since_start = now - scan.start_time;
+ MonoDelta time_since_start = now - scan->start_time;
- json->Set("tablet_id", scan.tablet_id);
- json->Set("scanner_id", scan.scanner_id);
- json->Set("state", ScanStateToString(scan.state));
+ json->Set("tablet_id", scan->tablet_id);
+ json->Set("scanner_id", scan->scanner_id);
+ json->Set("state", ScanStateToString(scan->state));
json->Set("query", ScanQueryHtml(scan));
- json->Set("requestor", scan.remote_user.username());
+ json->Set("requestor", scan->remote_user.username());
json->Set("duration", HumanReadableElapsedTime::ToShortString(duration.ToSeconds()));
- json->Set("num_round_trips", scan.last_call_seq_id);
+ json->Set("num_round_trips", scan->last_call_seq_id);
json->Set("time_since_start",
HumanReadableElapsedTime::ToShortString(time_since_start.ToSeconds()));
- const auto& cpu_times = scan.cpu_times;
+ const auto& cpu_times = scan->cpu_times;
json->Set("wall_secs",
HumanReadableElapsedTime::ToShortString(cpu_times.wall_seconds()));
json->Set("user_secs",
@@ -598,14 +598,24 @@ const char* kLongTimingTitle = "wall time, user cpu time, and system cpu time "
void TabletServerPathHandlers::HandleScansPage(const Webserver::WebRequest& /*req*/,
Webserver::WebResponse* resp) {
EasyJson* output = &resp->output;
- (*output)["scan_history_count"] = FLAGS_scan_history_count;
+ (*output)["completed_scan_history_count"] = FLAGS_completed_scan_history_count;
output->Set("timing_title", kLongTimingTitle);
- EasyJson scans = output->Set("scans", EasyJson::kArray);
- vector<ScanDescriptor> descriptors = tserver_->scanner_manager()->ListScans();
+ EasyJson completed_scans = output->Set("completed_scans", EasyJson::kArray);
+ vector<SharedScanDescriptor> descriptors = tserver_->scanner_manager()->ListScans();
for (const auto& descriptor : descriptors) {
- EasyJson scan = scans.PushBack(EasyJson::kObject);
- ScanToJson(descriptor, &scan);
+ EasyJson completed_scan = completed_scans.PushBack(EasyJson::kObject);
+ ScanToJson(descriptor, &completed_scan);
+ }
+
+ (*output)["slow_scan_history_count"] = FLAGS_slow_scan_history_count;
+ EasyJson slow_scans = output->Set("slow_scans", EasyJson::kArray);
+ vector<SharedScanDescriptor> slow_descriptors =
+ tserver_->scanner_manager()->ListSlowScans();
+
+ for (const auto& slow_descriptor : slow_descriptors) {
+ EasyJson slow_scan = slow_scans.PushBack(EasyJson::kObject);
+ ScanToJson(slow_descriptor, &slow_scan);
}
}
diff --git a/www/scans.mustache b/www/scans.mustache
index 064ec09a6..f567f9629 100644
--- a/www/scans.mustache
+++ b/www/scans.mustache
@@ -17,9 +17,9 @@ specific language governing permissions and limitations
under the License.
}}{{#raw}}{{{.}}}{{/raw}}{{^raw}}
-<h1>Scans</h1>
-<p class="lead">This page lists all running scans and last {{scan_history_count}} completed scans.
-The number of historical scans is configurable with the <samp>--scan_history_count</samp> flag.</p>
+<h1>Completed Scans</h1>
+<p class="lead">This table lists all running scans and last {{completed_scan_history_count}} completed scans.
+The number of historical completed scans is configurable with the <samp>--completed_scan_history_count</samp> flag.</p>
<table class="table table-striped">
<thead>
<tr>
@@ -36,7 +36,7 @@ The number of historical scans is configurable with the <samp>--scan_history_cou
</tr>
</thead>
<tbody>
- {{#scans}}
+ {{#completed_scans}}
<tr>
<td><a href="{{base_url}}/tablet?id={{tablet_id}}"><samp>{{tablet_id}}</samp></a></td>
<td><samp>{{scanner_id}}</samp></td>
@@ -72,7 +72,66 @@ The number of historical scans is configurable with the <samp>--scan_history_cou
</table>
</td>
</tr>
- {{/scans}}
+ {{/completed_scans}}
+ </tbody>
+</table>
+
+<h1>Slow Scans</h1>
+<p class="lead">This table lists all slow scans and last {{slow_scan_history_count}} slow scans.
+The number of historical slow scans is configurable with the <samp>--slow_scan_history_count</samp> flag.</p>
+<table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Tablet id</th>
+ <th>Scanner id</th>
+ <th>State</th>
+ <th title="pseudo-SQL query description">Query</th>
+ <th>Requestor</th>
+ <th title="amount of time that the scanner was/has been open on the server">Duration</th>
+ <th title="number of round trips">Round trips</th>
+ <th title="elapsed time since the scan started">Time since start</th>
+ <th title="{{timing_title}}">Timing</th>
+ <th>Column Stats</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#slow_scans}}
+ <tr>
+ <td><a href="{{base_url}}/tablet?id={{tablet_id}}"><samp>{{tablet_id}}</samp></a></td>
+ <td><samp>{{scanner_id}}</samp></td>
+ <td>{{state}}</td>
+ {{! The query string is pre-formatted HTML, so don't escape it (triple-brace). }}
+ <td><pre>{{{query}}}</pre></td>
+ <td><samp>{{requestor}}</samp></td>
+ <td title="{{duration_title}}">{{duration}}</td>
+ <td>{{num_round_trips}}</td>
+ <td title="{{time_since_start_title}}">{{time_since_start}}</td>
+ <td>real: {{wall_secs}} user: {{user_secs}} sys: {{sys_secs}}</td>
+
+ <td>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>column</th>
+ <th title="cells read from the column (disk or cache), exclusive of the MRS">cells read</th>
+ <th title="bytes read from the column (disk or cache), exclusive of the MRS">bytes read</th>
+ <th title="CFile data blocks read from the column (disk or cache)">blocks read</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#stats}}
+ <tr>
+ <td>{{column}}</td>
+ <td title="{{cells_read_title}}">{{cells_read}}</td>
+ <td title="{{bytes_read_title}}">{{bytes_read}}</td>
+ <td title="{{blocks_read_title}}">{{blocks_read}}</td>
+ </tr>
+ {{/stats}}
+ </tbody>
+ </table>
+ </td>
+ </tr>
+ {{/slow_scans}}
</tbody>
</table>
{{/raw}}