You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/08/28 06:17:58 UTC
[kudu] 01/02: [tserver] add test to reproduce KUDU-1587 conditions
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit c6d438ab417009e8007a1de274178d0bcf0dfb63
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Aug 7 20:51:04 2020 -0700
[tserver] add test to reproduce KUDU-1587 conditions
Added a test to reproduce conditions described in KUDU-1587.
As of now, the test is disabled: it will be enabled once
KUDU-1587 is addressed.
Change-Id: I515a1b26152680ee9b9361afcf84fec39b8f962d
Reviewed-on: http://gerrit.cloudera.org:8080/16312
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/tserver/tablet_server-test.cc | 133 +++++++++++++++++++++++++++++++++
1 file changed, 133 insertions(+)
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 62de1d8..dcab726 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -73,6 +73,7 @@
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
@@ -139,9 +140,11 @@ using kudu::tablet::Tablet;
using kudu::tablet::TabletReplica;
using kudu::tablet::TabletStatePB;
using kudu::tablet::TabletSuperBlockPB;
+using std::endl;
using std::make_shared;
using std::map;
using std::pair;
+using std::ostringstream;
using std::set;
using std::shared_ptr;
using std::string;
@@ -184,9 +187,11 @@ DECLARE_int32(maintenance_manager_num_threads);
DECLARE_int32(maintenance_manager_polling_interval_ms);
DECLARE_int32(memory_pressure_percentage);
DECLARE_int32(metrics_retirement_age_ms);
+DECLARE_int32(rpc_service_queue_length);
DECLARE_int32(scanner_batch_size_rows);
DECLARE_int32(scanner_gc_check_interval_us);
DECLARE_int32(scanner_ttl_ms);
+DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms);
DECLARE_int32(workload_stats_rate_collection_min_interval_ms);
DECLARE_int32(workload_stats_metric_collection_interval_ms);
DECLARE_string(block_manager);
@@ -199,6 +204,8 @@ METRIC_DECLARE_counter(log_block_manager_holes_punched);
METRIC_DECLARE_counter(rows_inserted);
METRIC_DECLARE_counter(rows_updated);
METRIC_DECLARE_counter(rows_deleted);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
+METRIC_DECLARE_counter(rpcs_timed_out_in_queue);
METRIC_DECLARE_counter(scanners_expired);
METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
METRIC_DECLARE_gauge_uint64(log_block_manager_containers);
@@ -206,6 +213,9 @@ METRIC_DECLARE_gauge_size(active_scanners);
METRIC_DECLARE_gauge_size(tablet_active_scanners);
METRIC_DECLARE_gauge_size(num_rowsets_on_disk);
METRIC_DECLARE_histogram(flush_dms_duration);
+METRIC_DECLARE_histogram(op_apply_queue_length);
+METRIC_DECLARE_histogram(op_apply_queue_time);
+
namespace kudu {
@@ -4351,5 +4361,128 @@ TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) {
});
}
+class OpApplyQueueTest : public TabletServerTestBase {
+ public:
+ // Starts the tablet server, override to start it later.
+ void SetUp() override {
+ // Since scenarios of this test make bursts of requests, set the maximum
+ // length of the service queue to accomodate many requests.
+ FLAGS_rpc_service_queue_length = 1000;
+ NO_FATALS(TabletServerTestBase::SetUp());
+ NO_FATALS(StartTabletServer(/*num_data_dirs=*/1));
+ }
+
+ static constexpr const int32_t kInjectedLatencyMs = 100;
+};
+
+// This is a regression test for KUDU-1587.
+// TODO(aserbin): enable the test once KUDU-1587 is addressed.
+TEST_F(OpApplyQueueTest, DISABLED_ApplyQueueBackpressure) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ constexpr size_t kNumCalls = 1000;
+ const int num_cpus = base::NumCPUs();
+ WriteRequestPB req;
+ req.set_tablet_id(kTabletId);
+ ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+ // Inject latency into WriteOp::Apply().
+ FLAGS_tablet_inject_latency_on_apply_write_op_ms = kInjectedLatencyMs;
+
+ // Send many calls to tablet server, not awaiting for responses.
+ // After sending the first chunk, wait for some time to allow for propagating
+ // the operations to the apply queue and to have apply queue times growing.
+ vector<unique_ptr<RpcController>> controllers;
+ vector<unique_ptr<WriteResponsePB>> responses;
+ CountDownLatch latch(kNumCalls);
+ for (size_t idx = 0; idx < kNumCalls; ++idx) {
+ controllers.emplace_back(new RpcController);
+ responses.emplace_back(new WriteResponsePB);
+
+ req.clear_row_operations();
+ auto* data = req.mutable_row_operations();
+ AddTestRowWithNullableStringToPB(
+ RowOperationsPB::INSERT, schema_, idx, idx, nullptr, data);
+ proxy_->AsyncRequest("Write", req, responses.back().get(), controllers.back().get(),
+ [&latch]() { latch.CountDown(); });
+ if (idx == 10 * num_cpus) {
+ // Allow to realize what current queue times are once ops reached
+ // the apply queue.
+ SleepFor(MonoDelta::FromMilliseconds(2 * kInjectedLatencyMs));
+ }
+ }
+
+ // Wait for calls to be processed before capturing the apply queue stats.
+ latch.Wait();
+
+ size_t num_ok = 0;
+ size_t num_error = 0;
+ for (const auto& ctl : controllers) {
+ if (ctl->status().ok()) {
+ ++num_ok;
+ } else {
+ ++num_error;
+ }
+ }
+ ASSERT_EQ(kNumCalls, num_ok + num_error);
+
+ // Not all request should succeed -- due to long apply times, some should be
+ // rejected.
+ EXPECT_GT(num_error, 0);
+
+ {
+ // No RPC queue overflows are expected.
+ auto rpc_queue_overflows = METRIC_rpcs_queue_overflow.Instantiate(
+ mini_server_->server()->metric_entity());
+ ASSERT_EQ(0, rpc_queue_overflows->value());
+ }
+
+ {
+ // No requests should timeout while in the queue.
+ auto timed_out_in_rpc_queue = METRIC_rpcs_timed_out_in_queue.Instantiate(
+ mini_server_->server()->metric_entity());
+ ASSERT_EQ(0, timed_out_in_rpc_queue->value());
+ }
+
+ {
+ // Some calls should be rejected due to overloaded apply queue,
+ // so the corresponding queue times should not get too close to the value of
+ // total_request * injected_latency / number_of_apply_threads.
+ auto qt = METRIC_op_apply_queue_time.Instantiate(
+ mini_server_->server()->metric_entity());
+ ostringstream ostr;
+ ostr << qt->prototype()->name() << ":" << endl;
+ const auto* h = qt->histogram();
+ h->DumpHumanReadable(&ostr);
+ LOG(INFO) << ostr.str();
+
+ // These are simple heuristics rather than exact theoretical thresholds.
+ // They depend on the injected latency and the 'overloaded' threshold
+ // for the apply queue.
+ EXPECT_LT(h->MaxValue(),
+ kInjectedLatencyMs * 1000 * kNumCalls * 4 / (4 * num_cpus));
+ EXPECT_LT(h->ValueAtPercentile(99),
+ kInjectedLatencyMs * 1000 * kNumCalls * 3 / (4 * num_cpus));
+ EXPECT_LT(h->ValueAtPercentile(75),
+ kInjectedLatencyMs * 1000 * kNumCalls * 2 / (4 * num_cpus));
+ }
+
+ {
+ // With current apply latency, the queue should not get too long.
+ auto ql = METRIC_op_apply_queue_length.Instantiate(
+ mini_server_->server()->metric_entity());
+ ostringstream ostr;
+ ostr << ql->prototype()->name() << ":" << endl;
+ const auto* h = ql->histogram();
+ h->DumpHumanReadable(&ostr);
+ LOG(INFO) << ostr.str();
+
+ // These are simple heuristics as well. They depend on the injected latency
+ // and the 'overloaded' threshold for the apply queue.
+ EXPECT_LT(h->MaxValue(), 3 * kNumCalls / 4);
+ EXPECT_LT(h->MeanValue(), kNumCalls / 2);
+ }
+}
+
} // namespace tserver
} // namespace kudu