You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/01/09 03:45:43 UTC

[incubator-pegasus] branch master updated: feat(hotkey): add a function test of hotkey detection (#665)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 685bbd2  feat(hotkey): add a function test of hotkey detection (#665)
685bbd2 is described below

commit 685bbd2730929ceb31959a3629a2e9cc7d8ed55e
Author: Smilencer <52...@qq.com>
AuthorDate: Fri Jan 8 21:45:35 2021 -0600

    feat(hotkey): add a function test of hotkey detection (#665)
---
 src/server/config.min.ini                      |   1 +
 src/server/hotkey_collector.cpp                |  36 ++--
 src/server/hotspot_partition_calculator.cpp    |   8 +-
 src/server/hotspot_partition_calculator.h      |   5 +-
 src/server/test/hotspot_partition_test.cpp     |  12 +-
 src/shell/commands/detect_hotkey.cpp           |   2 +
 src/test/function_test/run.sh                  |   2 +
 src/test/function_test/test_detect_hotspot.cpp | 257 +++++++++++++++++++++++++
 8 files changed, 298 insertions(+), 25 deletions(-)

diff --git a/src/server/config.min.ini b/src/server/config.min.ini
index 0094879..76612b4 100644
--- a/src/server/config.min.ini
+++ b/src/server/config.min.ini
@@ -142,6 +142,7 @@
   available_detect_app = @APP_NAME@
   available_detect_alert_script_dir = ./package/bin
   usage_stat_app = stat
+  enable_detect_hotkey = false
 
 [pegasus.clusters]
   onebox = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index 23aa21a..26ff59d 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -23,6 +23,7 @@
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/utility/flags.h>
 #include "base/pegasus_key_schema.h"
+#include "base/pegasus_utils.h"
 
 namespace pegasus {
 namespace server {
@@ -30,13 +31,13 @@ namespace server {
 DSN_DEFINE_uint32(
     "pegasus.server",
     hot_bucket_variance_threshold,
-    3,
+    7,
     "the variance threshold to detect hot bucket during coarse analysis of hotkey detection");
 
 DSN_DEFINE_uint32(
     "pegasus.server",
     hot_key_variance_threshold,
-    3,
+    5,
     "the variance threshold to detect hot key during fine analysis of hotkey detection");
 
 DSN_DEFINE_uint32("pegasus.server",
@@ -168,7 +169,8 @@ inline void hotkey_collector::change_state_by_result()
     case hotkey_collector_state::FINE_DETECTING:
         if (!_result.hot_hash_key.empty()) {
             change_state_to_finished();
-            derror_replica("Find the hotkey: {}", _result.hot_hash_key);
+            derror_replica("Find the hotkey: {}",
+                           pegasus::utils::c_escape_string(_result.hot_hash_key));
         }
         break;
     default:
@@ -239,25 +241,23 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
     switch (now_state) {
     case hotkey_collector_state::COARSE_DETECTING:
     case hotkey_collector_state::FINE_DETECTING:
-        resp.err = dsn::ERR_INVALID_STATE;
+        resp.err = dsn::ERR_BUSY;
         hint = fmt::format("still detecting {} hotkey, state is {}",
                            dsn::enum_to_string(_hotkey_type),
                            enum_to_string(now_state));
-        dwarn_replica(hint);
-        return;
+        break;
     case hotkey_collector_state::FINISHED:
-        resp.err = dsn::ERR_INVALID_STATE;
-        hint = fmt::format(
-            "{} hotkey result has been found, you can send a stop rpc to restart hotkey detection",
-            dsn::enum_to_string(_hotkey_type));
-        dwarn_replica(hint);
-        return;
+        resp.err = dsn::ERR_BUSY;
+        hint = fmt::format("{} hotkey result has been found: {}, you can send a stop rpc to "
+                           "restart hotkey detection",
+                           dsn::enum_to_string(_hotkey_type),
+                           pegasus::utils::c_escape_string(_result.hot_hash_key));
+        break;
     case hotkey_collector_state::STOPPED:
         change_state_to_coarse_detecting();
         resp.err = dsn::ERR_OK;
         hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
-        ddebug_replica(hint);
-        return;
+        break;
     default:
         hint = "invalid collector state";
         resp.err = dsn::ERR_INVALID_STATE;
@@ -265,6 +265,8 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
         derror_replica(hint);
         dassert(false, "invalid collector state");
     }
+    resp.__set_err_hint(hint);
+    dwarn_replica(hint);
 }
 
 void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
@@ -280,13 +282,13 @@ void hotkey_collector::query_result(dsn::replication::detect_hotkey_response &re
 {
     if (_state != hotkey_collector_state::FINISHED) {
         resp.err = dsn::ERR_BUSY;
-        std::string hint = fmt::format("hotkey is detecting now, now state: {}",
-                                       dsn::enum_to_string(_hotkey_type));
+        std::string hint =
+            fmt::format("Can't get hotkey now, now state: {}", enum_to_string(_state.load()));
         resp.__set_err_hint(hint);
         ddebug_replica(hint);
     } else {
         resp.err = dsn::ERR_OK;
-        resp.__set_hotkey_result(_result.hot_hash_key);
+        resp.__set_hotkey_result(pegasus::utils::c_escape_string(_result.hot_hash_key));
     }
 }
 
diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp
index 2bd6b9d..3b7086b 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -50,7 +50,7 @@ DSN_DEFINE_int32("pegasus.collector",
 
 DSN_DEFINE_int32("pegasus.collector",
                  occurrence_threshold,
-                 100,
+                 3,
                  "hot paritiotion occurrence times' threshold to send rpc to detect hotkey");
 
 void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
@@ -170,7 +170,7 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
     }
 }
 
-/*static*/ void hotspot_partition_calculator::send_detect_hotkey_request(
+void hotspot_partition_calculator::send_detect_hotkey_request(
     const std::string &app_name,
     const uint64_t partition_index,
     const dsn::replication::hotkey_type::type hotkey_type,
@@ -178,8 +178,8 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
 {
     FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});
 
-    int app_id;
-    int partition_count;
+    int app_id = -1;
+    int partition_count = -1;
     std::vector<dsn::partition_configuration> partitions;
     _shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions);
 
diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h
index ddbbb15..dc2db45 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -40,7 +40,10 @@ public:
     hotspot_partition_calculator(const std::string &app_name,
                                  int partition_count,
                                  std::shared_ptr<shell_context> context)
-        : _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
+        : _app_name(app_name),
+          _hot_points(partition_count),
+          _shell_context(context),
+          _hotpartition_counter(partition_count)
     {
         init_perf_counter(partition_count);
     }
diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp
index 645fa8a..d390f18 100644
--- a/src/server/test/hotspot_partition_test.cpp
+++ b/src/server/test/hotspot_partition_test.cpp
@@ -25,6 +25,7 @@ namespace pegasus {
 namespace server {
 
 DSN_DECLARE_int32(occurrence_threshold);
+DSN_DECLARE_bool(enable_detect_hotkey);
 
 class hotspot_partition_test : public pegasus_server_test_base
 {
@@ -33,8 +34,13 @@ public:
     {
         dsn::fail::setup();
         dsn::fail::cfg("send_detect_hotkey_request", "return()");
+        FLAGS_enable_detect_hotkey = true;
     };
-    ~hotspot_partition_test() { dsn::fail::teardown(); }
+    ~hotspot_partition_test()
+    {
+        FLAGS_enable_detect_hotkey = false;
+        dsn::fail::teardown();
+    }
 
     hotspot_partition_calculator calculator;
 
@@ -136,8 +142,8 @@ TEST_F(hotspot_partition_test, send_detect_hotkey_request)
     expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold;
     aggregate_analyse_data(test_rows, expect_result, FLAGS_occurrence_threshold);
     const int back_to_normal = 30;
-    expect_result[READ_HOT_PARTITION][0] = FLAGS_occurrence_threshold - back_to_normal;
-    expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold - back_to_normal;
+    expect_result[READ_HOT_PARTITION][0] = 0;
+    expect_result[WRITE_HOT_PARTITION][1] = 0;
     aggregate_analyse_data(generate_row_data(), expect_result, back_to_normal);
 }
 
diff --git a/src/shell/commands/detect_hotkey.cpp b/src/shell/commands/detect_hotkey.cpp
index d2ffed7..fd4bbe1 100644
--- a/src/shell/commands/detect_hotkey.cpp
+++ b/src/shell/commands/detect_hotkey.cpp
@@ -40,6 +40,8 @@ bool generate_hotkey_request(dsn::replication::detect_hotkey_request &req,
         req.action = dsn::replication::detect_action::START;
     } else if (!strcasecmp(hotkey_action.c_str(), "stop")) {
         req.action = dsn::replication::detect_action::STOP;
+    } else if (!strcasecmp(hotkey_type.c_str(), "query")) {
+        req.action = dsn::replication::detect_action::QUERY;
     } else {
         err_info =
             fmt::format("\"{}\" is an invalid hotkey detect action (should be 'start' or 'stop')\n",
diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh
index 3aa5d92..334b590 100755
--- a/src/test/function_test/run.sh
+++ b/src/test/function_test/run.sh
@@ -71,4 +71,6 @@ if [ $on_travis == "NO" ]; then
     exit_if_fail $? "run test recovery failed: $test_case $config_file $table_name"
     GTEST_OUTPUT="xml:$REPORT_DIR/bulk_load.xml" GTEST_FILTER="bulk_load_test.*" ./$test_case $config_file $table_name
     exit_if_fail $? "run test bulk load failed: $test_case $config_file $table_name"
+    GTEST_OUTPUT="xml:$REPORT_DIR/test_detect_hotspot.xml" GTEST_FILTER="test_detect_hotspot.*" ./$test_case $config_file $table_name
+    exit_if_fail $? "run test test_detect_hotspot load failed: $test_case $config_file $table_name"
 fi
diff --git a/src/test/function_test/test_detect_hotspot.cpp b/src/test/function_test/test_detect_hotspot.cpp
new file mode 100644
index 0000000..d60b90a
--- /dev/null
+++ b/src/test/function_test/test_detect_hotspot.cpp
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <libgen.h>
+
+#include <dsn/utility/filesystem.h>
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <pegasus/client.h>
+#include <gtest/gtest.h>
+#include <boost/lexical_cast.hpp>
+#include <dsn/utility/rand.h>
+
+#include "base/pegasus_const.h"
+#include "global_env.h"
+
+using namespace ::dsn;
+using namespace ::dsn::replication;
+using namespace pegasus;
+
+static std::string generate_hash_key_by_random(bool is_hotkey, int probability = 100)
+{
+    if (is_hotkey && (dsn::rand::next_u32(100) < probability)) {
+        return "ThisisahotkeyThisisahotkey";
+    }
+    static const std::string chars("abcdefghijklmnopqrstuvwxyz"
+                                   "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                                   "1234567890"
+                                   "!@#$%^&*()"
+                                   "`~-_=+[{]{\\|;:'\",<.>/? ");
+    std::string result;
+    for (int i = 0; i < 20; i++) {
+        result += chars[dsn::rand::next_u32(chars.size())];
+    }
+    return result;
+}
+
+enum detection_type
+{
+    read_data,
+    write_data
+};
+enum key_type
+{
+    random_dataset,
+    hotspot_dataset
+};
+
+class test_detect_hotspot : public testing::Test
+{
+public:
+    virtual void SetUp() override
+    {
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("pwd");
+        system("./run.sh clear_onebox");
+        system("cp src/server/config.min.ini config-server-test-hotspot.ini");
+        system("sed -i \"/^\\s*enable_detect_hotkey/c enable_detect_hotkey = "
+               "true\" config-server-test-hotspot.ini");
+        system("./run.sh start_onebox -c -w --config_path config-server-test-hotspot.ini");
+        std::this_thread::sleep_for(std::chrono::seconds(3));
+
+        std::vector<dsn::rpc_address> meta_list;
+        replica_helper::load_meta_servers(
+            meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), "single_master_cluster");
+
+        ddl_client = std::make_shared<replication_ddl_client>(meta_list);
+        pg_client =
+            pegasus::pegasus_client_factory::get_client("single_master_cluster", app_name.c_str());
+
+        auto err = ddl_client->create_app(app_name.c_str(), "pegasus", 8, 3, {}, false);
+        ASSERT_EQ(dsn::ERR_OK, err);
+
+        ddl_client->list_app(app_name, app_id, partition_count, partitions);
+    }
+
+    virtual void TearDown() override
+    {
+        chdir(global_env::instance()._pegasus_root.c_str());
+        system("./run.sh clear_onebox");
+        system("./run.sh start_onebox -w");
+        chdir(global_env::instance()._working_dir.c_str());
+    }
+
+    void generate_dataset(int64_t time_duration, detection_type dt, key_type kt)
+    {
+        int64_t start = dsn_now_s();
+        int err = PERR_OK;
+        ASSERT_NE(pg_client, nullptr);
+
+        for (int i = 0; dsn_now_s() - start < time_duration; ++i %= 1000) {
+            std::string index = std::to_string(i);
+            std::string h_key = generate_hash_key_by_random(kt, 50);
+            std::string s_key = "sortkey_" + index;
+            std::string value = "value_" + index;
+            if (dt == detection_type::write_data) {
+                err = pg_client->set(h_key, s_key, value);
+                ASSERT_EQ(err, PERR_OK);
+            } else {
+                err = pg_client->get(h_key, s_key, value);
+                ASSERT_TRUE((err == PERR_OK) || err == (PERR_NOT_FOUND));
+            }
+        }
+    }
+
+    void get_result(detection_type dt, key_type expect_hotspot)
+    {
+        if (dt == detection_type::write_data) {
+            req.type = dsn::replication::hotkey_type::type::WRITE;
+        } else {
+            req.type = dsn::replication::hotkey_type::type::READ;
+        }
+        req.action = dsn::replication::detect_action::QUERY;
+
+        bool find_hotkey = false;
+        int partition_index;
+        for (partition_index = 0; partition_index < partitions.size(); partition_index++) {
+            req.pid = dsn::gpid(app_id, partition_index);
+            auto errinfo =
+                ddl_client->detect_hotkey(partitions[partition_index].primary, req, resp);
+            ASSERT_EQ(errinfo, dsn::ERR_OK);
+            if (!resp.hotkey_result.empty()) {
+                find_hotkey = true;
+                break;
+            }
+        }
+        if (expect_hotspot == key_type::hotspot_dataset) {
+            ASSERT_TRUE(find_hotkey);
+            ASSERT_EQ(resp.err, dsn::ERR_OK);
+            ASSERT_EQ(resp.hotkey_result, "ThisisahotkeyThisisahotkey");
+        } else {
+            ASSERT_FALSE(find_hotkey);
+        }
+
+        // Wait for collector sending the next start detecting command
+        sleep(15);
+
+        req.action = dsn::replication::detect_action::STOP;
+        for (partition_index = 0; partition_index < partitions.size(); partition_index++) {
+            auto errinfo =
+                ddl_client->detect_hotkey(partitions[partition_index].primary, req, resp);
+            ASSERT_EQ(errinfo, dsn::ERR_OK);
+            ASSERT_EQ(resp.err, dsn::ERR_OK);
+        }
+
+        req.action = dsn::replication::detect_action::QUERY;
+        for (partition_index = 0; partition_index < partitions.size(); partition_index++) {
+            req.pid = dsn::gpid(app_id, partition_index);
+            auto errinfo =
+                ddl_client->detect_hotkey(partitions[partition_index].primary, req, resp);
+            ASSERT_EQ(errinfo, dsn::ERR_OK);
+            ASSERT_EQ(resp.err_hint,
+                      "Can't get hotkey now, now state: hotkey_collector_state::STOPPED");
+        }
+    }
+
+    void write_hotspot_data()
+    {
+        generate_dataset(warmup_second, detection_type::write_data, key_type::random_dataset);
+        generate_dataset(
+            max_detection_second, detection_type::write_data, key_type::hotspot_dataset);
+        get_result(detection_type::write_data, key_type::hotspot_dataset);
+    }
+
+    void write_random_data()
+    {
+        generate_dataset(
+            max_detection_second, detection_type::write_data, key_type::random_dataset);
+        get_result(detection_type::write_data, key_type::random_dataset);
+    }
+
+    void capture_until_maxtime()
+    {
+        int target_partition = 2;
+        req.type = dsn::replication::hotkey_type::type::WRITE;
+        req.action = dsn::replication::detect_action::START;
+
+        req.pid = dsn::gpid(app_id, target_partition);
+        auto errinfo = ddl_client->detect_hotkey(partitions[target_partition].primary, req, resp);
+        ASSERT_EQ(errinfo, dsn::ERR_OK);
+        ASSERT_EQ(resp.err, dsn::ERR_OK);
+
+        req.action = dsn::replication::detect_action::QUERY;
+        errinfo = ddl_client->detect_hotkey(partitions[target_partition].primary, req, resp);
+        ASSERT_EQ(resp.err_hint,
+                  "Can't get hotkey now, now state: hotkey_collector_state::COARSE_DETECTING");
+
+        // max_detection_second > max_seconds_to_detect_hotkey
+        int max_seconds_to_detect_hotkey = 160;
+        generate_dataset(
+            max_seconds_to_detect_hotkey, detection_type::write_data, key_type::random_dataset);
+
+        req.action = dsn::replication::detect_action::QUERY;
+        errinfo = ddl_client->detect_hotkey(partitions[target_partition].primary, req, resp);
+        ASSERT_EQ(resp.err_hint,
+                  "Can't get hotkey now, now state: hotkey_collector_state::STOPPED");
+    }
+
+    void read_hotspot_data()
+    {
+        generate_dataset(warmup_second, detection_type::read_data, key_type::hotspot_dataset);
+        generate_dataset(
+            max_detection_second, detection_type::read_data, key_type::hotspot_dataset);
+        get_result(detection_type::read_data, key_type::hotspot_dataset);
+    }
+
+    void read_random_data()
+    {
+        generate_dataset(max_detection_second, detection_type::read_data, key_type::random_dataset);
+        get_result(detection_type::read_data, key_type::random_dataset);
+    }
+
+    const std::string app_name = "hotspot_test";
+    const int64_t max_detection_second = 100;
+    const int64_t warmup_second = 30;
+    int32_t app_id;
+    int32_t partition_count;
+    std::vector<dsn::partition_configuration> partitions;
+    dsn::replication::detect_hotkey_response resp;
+    dsn::replication::detect_hotkey_request req;
+    std::shared_ptr<replication_ddl_client> ddl_client;
+    pegasus::pegasus_client *pg_client;
+};
+
+TEST_F(test_detect_hotspot, write_hotspot_data)
+{
+    std::cout << "start testing write hotspot data..." << std::endl;
+    write_hotspot_data();
+    std::cout << "write hotspot data passed....." << std::endl;
+    std::cout << "start testing write random data..." << std::endl;
+    write_random_data();
+    std::cout << "write random data passed....." << std::endl;
+    std::cout << "start testing max detection time..." << std::endl;
+    capture_until_maxtime();
+    std::cout << "max detection time passed....." << std::endl;
+    std::cout << "start testing read hotspot data..." << std::endl;
+    read_hotspot_data();
+    std::cout << "read hotspot data passed....." << std::endl;
+    std::cout << "start testing read random data..." << std::endl;
+    read_random_data();
+    std::cout << "read random data passed....." << std::endl;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org