You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/08/28 06:17:58 UTC

[kudu] 01/02: [tserver] add test to reproduce KUDU-1587 conditions

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c6d438ab417009e8007a1de274178d0bcf0dfb63
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Aug 7 20:51:04 2020 -0700

    [tserver] add test to reproduce KUDU-1587 conditions
    
    Added a test to reproduce conditions described in KUDU-1587.
    As of now, the test is disabled: it will be enabled once
    KUDU-1587 is addressed.
    
    Change-Id: I515a1b26152680ee9b9361afcf84fec39b8f962d
    Reviewed-on: http://gerrit.cloudera.org:8080/16312
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tserver/tablet_server-test.cc | 133 +++++++++++++++++++++++++++++++++
 1 file changed, 133 insertions(+)

diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 62de1d8..dcab726 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -73,6 +73,7 @@
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
@@ -139,9 +140,11 @@ using kudu::tablet::Tablet;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::TabletStatePB;
 using kudu::tablet::TabletSuperBlockPB;
+using std::endl;
 using std::make_shared;
 using std::map;
 using std::pair;
+using std::ostringstream;
 using std::set;
 using std::shared_ptr;
 using std::string;
@@ -184,9 +187,11 @@ DECLARE_int32(maintenance_manager_num_threads);
 DECLARE_int32(maintenance_manager_polling_interval_ms);
 DECLARE_int32(memory_pressure_percentage);
 DECLARE_int32(metrics_retirement_age_ms);
+DECLARE_int32(rpc_service_queue_length);
 DECLARE_int32(scanner_batch_size_rows);
 DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_int32(scanner_ttl_ms);
+DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms);
 DECLARE_int32(workload_stats_rate_collection_min_interval_ms);
 DECLARE_int32(workload_stats_metric_collection_interval_ms);
 DECLARE_string(block_manager);
@@ -199,6 +204,8 @@ METRIC_DECLARE_counter(log_block_manager_holes_punched);
 METRIC_DECLARE_counter(rows_inserted);
 METRIC_DECLARE_counter(rows_updated);
 METRIC_DECLARE_counter(rows_deleted);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
+METRIC_DECLARE_counter(rpcs_timed_out_in_queue);
 METRIC_DECLARE_counter(scanners_expired);
 METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
 METRIC_DECLARE_gauge_uint64(log_block_manager_containers);
@@ -206,6 +213,9 @@ METRIC_DECLARE_gauge_size(active_scanners);
 METRIC_DECLARE_gauge_size(tablet_active_scanners);
 METRIC_DECLARE_gauge_size(num_rowsets_on_disk);
 METRIC_DECLARE_histogram(flush_dms_duration);
+METRIC_DECLARE_histogram(op_apply_queue_length);
+METRIC_DECLARE_histogram(op_apply_queue_time);
+
 
 namespace kudu {
 
@@ -4351,5 +4361,128 @@ TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) {
   });
 }
 
+class OpApplyQueueTest : public TabletServerTestBase {
+ public:
+  // Starts the tablet server, override to start it later.
+  void SetUp() override {
+    // Since scenarios of this test make bursts of requests, set the maximum
+    // length of the service queue to accomodate many requests.
+    FLAGS_rpc_service_queue_length = 1000;
+    NO_FATALS(TabletServerTestBase::SetUp());
+    NO_FATALS(StartTabletServer(/*num_data_dirs=*/1));
+  }
+
+  static constexpr const int32_t kInjectedLatencyMs = 100;
+};
+
+// This is a regression test for KUDU-1587.
+// TODO(aserbin): enable the test once KUDU-1587 is addressed.
+TEST_F(OpApplyQueueTest, DISABLED_ApplyQueueBackpressure) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr size_t kNumCalls = 1000;
+  const int num_cpus = base::NumCPUs();
+  WriteRequestPB req;
+  req.set_tablet_id(kTabletId);
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  // Inject latency into WriteOp::Apply().
+  FLAGS_tablet_inject_latency_on_apply_write_op_ms = kInjectedLatencyMs;
+
+  // Send many calls to tablet server, not awaiting for responses.
+  // After sending the first chunk, wait for some time to allow for propagating
+  // the operations to the apply queue and to have apply queue times growing.
+  vector<unique_ptr<RpcController>> controllers;
+  vector<unique_ptr<WriteResponsePB>> responses;
+  CountDownLatch latch(kNumCalls);
+  for (size_t idx = 0; idx < kNumCalls; ++idx) {
+    controllers.emplace_back(new RpcController);
+    responses.emplace_back(new WriteResponsePB);
+
+    req.clear_row_operations();
+    auto* data = req.mutable_row_operations();
+    AddTestRowWithNullableStringToPB(
+        RowOperationsPB::INSERT, schema_, idx, idx, nullptr, data);
+    proxy_->AsyncRequest("Write", req, responses.back().get(), controllers.back().get(),
+                         [&latch]() { latch.CountDown(); });
+    if (idx == 10 * num_cpus) {
+      // Allow to realize what current queue times are once ops reached
+      // the apply queue.
+      SleepFor(MonoDelta::FromMilliseconds(2 * kInjectedLatencyMs));
+    }
+  }
+
+  // Wait for calls to be processed before capturing the apply queue stats.
+  latch.Wait();
+
+  size_t num_ok = 0;
+  size_t num_error = 0;
+  for (const auto& ctl : controllers) {
+    if (ctl->status().ok()) {
+      ++num_ok;
+    } else {
+      ++num_error;
+    }
+  }
+  ASSERT_EQ(kNumCalls, num_ok + num_error);
+
+  // Not all request should succeed -- due to long apply times, some should be
+  // rejected.
+  EXPECT_GT(num_error, 0);
+
+  {
+    // No RPC queue overflows are expected.
+    auto rpc_queue_overflows = METRIC_rpcs_queue_overflow.Instantiate(
+        mini_server_->server()->metric_entity());
+    ASSERT_EQ(0, rpc_queue_overflows->value());
+  }
+
+  {
+    // No requests should timeout while in the queue.
+    auto timed_out_in_rpc_queue = METRIC_rpcs_timed_out_in_queue.Instantiate(
+        mini_server_->server()->metric_entity());
+    ASSERT_EQ(0, timed_out_in_rpc_queue->value());
+  }
+
+  {
+    // Some calls should be rejected due to overloaded apply queue,
+    // so the corresponding queue times should not get too close to the value of
+    // total_request * injected_latency / number_of_apply_threads.
+    auto qt = METRIC_op_apply_queue_time.Instantiate(
+        mini_server_->server()->metric_entity());
+    ostringstream ostr;
+    ostr << qt->prototype()->name() << ":" << endl;
+    const auto* h = qt->histogram();
+    h->DumpHumanReadable(&ostr);
+    LOG(INFO) << ostr.str();
+
+    // These are simple heuristics rather than exact theoretical thresholds.
+    // They depend on the injected latency and the 'overloaded' threshold
+    // for the apply queue.
+    EXPECT_LT(h->MaxValue(),
+              kInjectedLatencyMs * 1000 * kNumCalls * 4 / (4 * num_cpus));
+    EXPECT_LT(h->ValueAtPercentile(99),
+              kInjectedLatencyMs * 1000 * kNumCalls * 3 / (4 * num_cpus));
+    EXPECT_LT(h->ValueAtPercentile(75),
+              kInjectedLatencyMs * 1000 * kNumCalls * 2 / (4 * num_cpus));
+  }
+
+  {
+    // With current apply latency, the queue should not get too long.
+    auto ql = METRIC_op_apply_queue_length.Instantiate(
+        mini_server_->server()->metric_entity());
+    ostringstream ostr;
+    ostr << ql->prototype()->name() << ":" << endl;
+    const auto* h = ql->histogram();
+    h->DumpHumanReadable(&ostr);
+    LOG(INFO) << ostr.str();
+
+    // These are simple heuristics as well. They depend on the injected latency
+    // and the 'overloaded' threshold for the apply queue.
+    EXPECT_LT(h->MaxValue(), 3 * kNumCalls / 4);
+    EXPECT_LT(h->MeanValue(), kNumCalls / 2);
+  }
+}
+
 } // namespace tserver
 } // namespace kudu