You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/10/17 15:23:20 UTC
[3/5] kudu git commit: Implement per-tablet active scanner metric
Implement per-tablet active scanner metric
The active_scanners metric would be useful scoped per-tablet.
This patch exposes it as tablet_active_scanners.
It also adds coverage of scanner expiration in tablet_server-test.
Change-Id: I3b2d85f78c655e02654f8581e9521452b149b7d5
Reviewed-on: http://gerrit.cloudera.org:8080/8228
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/60ea4a64
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/60ea4a64
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/60ea4a64
Branch: refs/heads/master
Commit: 60ea4a64366d46a7b2e86aef5c9b38c32e6af15b
Parents: 33c0669
Author: Will Berkeley <wd...@apache.org>
Authored: Mon Oct 2 15:50:33 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Oct 17 04:39:03 2017 +0000
----------------------------------------------------------------------
src/kudu/tablet/tablet_metrics.cc | 22 ++++++----
src/kudu/tablet/tablet_metrics.h | 4 ++
src/kudu/tserver/scanner_metrics.cc | 4 +-
src/kudu/tserver/scanners.cc | 16 ++++++-
src/kudu/tserver/tablet_server-test.cc | 67 +++++++++++++++++++++++++++--
5 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/60ea4a64/src/kudu/tablet/tablet_metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metrics.cc b/src/kudu/tablet/tablet_metrics.cc
index b5469e9..a3b6b41 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -42,6 +42,15 @@ METRIC_DEFINE_counter(tablet, rows_deleted, "Rows Deleted",
kudu::MetricUnit::kRows,
"Number of row delete operations performed on this tablet since service start");
+METRIC_DEFINE_counter(tablet, insertions_failed_dup_key, "Duplicate Key Inserts",
+ kudu::MetricUnit::kRows,
+ "Number of inserts which failed because the key already existed");
+
+METRIC_DEFINE_counter(tablet, upserts_as_updates, "Upserts converted into updates",
+ kudu::MetricUnit::kRows,
+ "Number of upserts which were applied as updates because the key already "
+ "existed.");
+
METRIC_DEFINE_counter(tablet, scanner_rows_returned, "Scanner Rows Returned",
kudu::MetricUnit::kRows,
"Number of rows returned by scanners to clients. This count "
@@ -59,7 +68,6 @@ METRIC_DEFINE_counter(tablet, scanner_bytes_returned, "Scanner Bytes Returned",
"for consumption by clients, and thus is not "
"a reflection of the amount of work being done by scanners.");
-
METRIC_DEFINE_counter(tablet, scanner_rows_scanned, "Scanner Rows Scanned",
kudu::MetricUnit::kRows,
"Number of rows processed by scan requests. This is measured "
@@ -90,17 +98,12 @@ METRIC_DEFINE_counter(tablet, scanner_bytes_scanned_from_disk, "Scanner Bytes Sc
"and does not include data read from in-memory stores. However, it"
"includes both cache misses and cache hits.");
-
-METRIC_DEFINE_counter(tablet, insertions_failed_dup_key, "Duplicate Key Inserts",
- kudu::MetricUnit::kRows,
- "Number of inserts which failed because the key already existed");
-METRIC_DEFINE_counter(tablet, upserts_as_updates, "Upserts converted into updates",
- kudu::MetricUnit::kRows,
- "Number of upserts which were applied as updates because the key already "
- "existed.");
METRIC_DEFINE_counter(tablet, scans_started, "Scans Started",
kudu::MetricUnit::kScanners,
"Number of scanners which have been started on this tablet");
+METRIC_DEFINE_gauge_size(tablet, tablet_active_scanners, "Active Scanners",
+ kudu::MetricUnit::kScanners,
+ "Number of scanners that are currently active on this tablet");
METRIC_DEFINE_counter(tablet, bloom_lookups, "Bloom Filter Lookups",
kudu::MetricUnit::kProbes,
@@ -275,6 +278,7 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
MINIT(scanner_cells_scanned_from_disk),
MINIT(scanner_bytes_scanned_from_disk),
MINIT(scans_started),
+ GINIT(tablet_active_scanners),
MINIT(bloom_lookups),
MINIT(key_file_lookups),
MINIT(delta_file_lookups),
http://git-wip-us.apache.org/repos/asf/kudu/blob/60ea4a64/src/kudu/tablet/tablet_metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 6fa5a64..9fe87b7 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -18,6 +18,7 @@
#define KUDU_TABLET_TABLET_METRICS_H
#include <cstdint>
+#include <stddef.h>
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/metrics.h"
@@ -49,6 +50,8 @@ struct TabletMetrics {
scoped_refptr<Counter> rows_deleted;
scoped_refptr<Counter> insertions_failed_dup_key;
scoped_refptr<Counter> upserts_as_updates;
+
+ // Scanner metrics
scoped_refptr<Counter> scanner_rows_returned;
scoped_refptr<Counter> scanner_cells_returned;
scoped_refptr<Counter> scanner_bytes_returned;
@@ -56,6 +59,7 @@ struct TabletMetrics {
scoped_refptr<Counter> scanner_cells_scanned_from_disk;
scoped_refptr<Counter> scanner_bytes_scanned_from_disk;
scoped_refptr<Counter> scans_started;
+ scoped_refptr<AtomicGauge<size_t>> tablet_active_scanners;
// Probe stats
scoped_refptr<Counter> bloom_lookups;
http://git-wip-us.apache.org/repos/asf/kudu/blob/60ea4a64/src/kudu/tserver/scanner_metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanner_metrics.cc b/src/kudu/tserver/scanner_metrics.cc
index 80f189d..503290e 100644
--- a/src/kudu/tserver/scanner_metrics.cc
+++ b/src/kudu/tserver/scanner_metrics.cc
@@ -22,12 +22,12 @@
METRIC_DEFINE_counter(server, scanners_expired,
"Scanners Expired",
kudu::MetricUnit::kScanners,
- "Number of scanners that have expired since service start");
+ "Number of scanners that have expired due to inactivity since service start");
METRIC_DEFINE_histogram(server, scanner_duration,
"Scanner Duration",
kudu::MetricUnit::kMicroseconds,
- "Histogram of the duration of active scanners on this tablet.",
+ "Histogram of the duration of active scanners on this server",
60000000LU, 2);
namespace kudu {
http://git-wip-us.apache.org/repos/asf/kudu/blob/60ea4a64/src/kudu/tserver/scanners.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 92a955b..846c7b8 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -31,6 +31,8 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tserver/scanner_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/metrics.h"
@@ -45,8 +47,6 @@ DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L *1000L, // 5 seconds
"Number of microseconds in the interval at which we remove expired scanners");
TAG_FLAG(scanner_gc_check_interval_us, hidden);
-// TODO: would be better to scope this at a tablet level instead of
-// server level.
METRIC_DEFINE_gauge_size(server, active_scanners,
"Active Scanners",
kudu::MetricUnit::kScanners,
@@ -212,10 +212,22 @@ Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
metrics_(metrics),
arena_(1024, 1024 * 1024),
row_format_flags_(row_format_flags) {
+ if (tablet_replica_) {
+ auto tablet = tablet_replica->shared_tablet();
+ if (tablet && tablet->metrics()) {
+ tablet->metrics()->tablet_active_scanners->Increment();
+ }
+ }
UpdateAccessTime();
}
Scanner::~Scanner() {
+ if (tablet_replica_) {
+ auto tablet = tablet_replica_->shared_tablet();
+ if (tablet && tablet->metrics()) {
+ tablet->metrics()->tablet_active_scanners->IncrementBy(-1);
+ }
+ }
if (metrics_) {
metrics_->SubmitScannerDuration(start_time_);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/60ea4a64/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 9405f5a..3406d7a 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -22,6 +22,7 @@
#include <cstdint>
#include <memory>
#include <sstream>
+#include <stddef.h>
#include <string>
#include <utility>
#include <vector>
@@ -50,6 +51,7 @@
#include "kudu/consensus/metadata.pb.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/callback.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/port.h"
@@ -124,13 +126,18 @@ DEFINE_int32(single_threaded_insert_latency_bench_insert_rows, 1000,
DECLARE_bool(fail_dns_resolution);
DECLARE_int32(metrics_retirement_age_ms);
DECLARE_int32(scanner_batch_size_rows);
+DECLARE_int32(scanner_gc_check_interval_us);
+DECLARE_int32(scanner_ttl_ms);
DECLARE_string(block_manager);
// Declare these metrics prototypes for simpler unit testing of their behavior.
METRIC_DECLARE_counter(rows_inserted);
METRIC_DECLARE_counter(rows_updated);
METRIC_DECLARE_counter(rows_deleted);
+METRIC_DECLARE_counter(scanners_expired);
METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
+METRIC_DECLARE_gauge_size(active_scanners);
+METRIC_DECLARE_gauge_size(tablet_active_scanners);
namespace kudu {
@@ -261,8 +268,7 @@ TEST_F(TabletServerTest, TestWebPages) {
FLAGS_metrics_retirement_age_ms = 0;
for (int i = 0; i < 3; i++) {
SCOPED_TRACE(i);
- ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz", addr, kTabletId),
- &buf));
+ ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz", addr), &buf));
// Check that the tablet entry shows up.
ASSERT_STR_CONTAINS(buf.ToString(), "\"type\": \"tablet\"");
@@ -1151,6 +1157,17 @@ TEST_F(TabletServerTest, TestScan) {
int num_rows = AllowSlowTests() ? 10000 : 1000;
InsertTestRowsDirect(0, num_rows);
+ // Instantiate scanner metrics.
+ ASSERT_TRUE(mini_server_->server()->metric_entity());
+ // We don't care what the function is, since the metric is already instantiated.
+ auto active_scanners = METRIC_active_scanners.InstantiateFunctionGauge(
+ mini_server_->server()->metric_entity(), Callback<size_t(void)>());
+ scoped_refptr<TabletReplica> tablet;
+ ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
+ ASSERT_TRUE(tablet->tablet()->GetMetricEntity());
+ scoped_refptr<AtomicGauge<size_t>> tablet_active_scanners =
+ METRIC_tablet_active_scanners.Instantiate(tablet->tablet()->GetMetricEntity(), 0);
+
ScanResponsePB resp;
ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp));
@@ -1163,10 +1180,13 @@ TEST_F(TabletServerTest, TestScan) {
ASSERT_TRUE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk));
}
+ // Ensure that the scanner shows up in the server and tablet's metrics.
+ ASSERT_EQ(1, active_scanners->value());
+ ASSERT_EQ(1, tablet_active_scanners->value());
+
// Drain all the rows from the scanner.
vector<string> results;
- ASSERT_NO_FATAL_FAILURE(
- DrainScannerToStrings(resp.scanner_id(), schema_, &results));
+ ASSERT_NO_FATAL_FAILURE(DrainScannerToStrings(resp.scanner_id(), schema_, &results));
ASSERT_EQ(num_rows, results.size());
KuduPartialRow row(&schema_);
@@ -1182,6 +1202,45 @@ TEST_F(TabletServerTest, TestScan) {
SharedScanner junk;
ASSERT_FALSE(mini_server_->server()->scanner_manager()->LookupScanner(scanner_id, &junk));
}
+
+ // Ensure that the metrics have been updated now that the scanner is unregistered.
+ ASSERT_EQ(0, active_scanners->value());
+ ASSERT_EQ(0, tablet_active_scanners->value());
+}
+
+TEST_F(TabletServerTest, TestExpiredScanner) {
+ // Make scanners expire quickly.
+ FLAGS_scanner_ttl_ms = 1;
+
+ int num_rows = 100;
+ InsertTestRowsDirect(0, num_rows);
+
+ // Instantiate scanners expired metric.
+ ASSERT_TRUE(mini_server_->server()->metric_entity());
+ scoped_refptr<Counter> scanners_expired = METRIC_scanners_expired.Instantiate(
+ mini_server_->server()->metric_entity());
+
+ // Initially, there've been no scanners, so none of have expired.
+ ASSERT_EQ(0, scanners_expired->value());
+
+ // Open a scanner but don't read from it.
+ ScanResponsePB resp;
+ ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp));
+
+ // The scanner should expire after a short time.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(1, scanners_expired->value());
+ });
+
+ // Continue the scan. We should get a SCANNER_EXPIRED error.
+ ScanRequestPB req;
+ RpcController rpc;
+ req.set_scanner_id(resp.scanner_id());
+ req.set_call_seq_id(1);
+ resp.Clear();
+ ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+ ASSERT_TRUE(resp.has_error());
+ ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, resp.error().code());
}
TEST_F(TabletServerTest, TestScannerOpenWhenServerShutsDown) {