You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/01/09 03:45:43 UTC
[incubator-pegasus] branch master updated: feat(hotkey): add a
function test of hotkey detection (#665)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 685bbd2 feat(hotkey): add a function test of hotkey detection (#665)
685bbd2 is described below
commit 685bbd2730929ceb31959a3629a2e9cc7d8ed55e
Author: Smilencer <52...@qq.com>
AuthorDate: Fri Jan 8 21:45:35 2021 -0600
feat(hotkey): add a function test of hotkey detection (#665)
---
src/server/config.min.ini | 1 +
src/server/hotkey_collector.cpp | 36 ++--
src/server/hotspot_partition_calculator.cpp | 8 +-
src/server/hotspot_partition_calculator.h | 5 +-
src/server/test/hotspot_partition_test.cpp | 12 +-
src/shell/commands/detect_hotkey.cpp | 2 +
src/test/function_test/run.sh | 2 +
src/test/function_test/test_detect_hotspot.cpp | 257 +++++++++++++++++++++++++
8 files changed, 298 insertions(+), 25 deletions(-)
diff --git a/src/server/config.min.ini b/src/server/config.min.ini
index 0094879..76612b4 100644
--- a/src/server/config.min.ini
+++ b/src/server/config.min.ini
@@ -142,6 +142,7 @@
available_detect_app = @APP_NAME@
available_detect_alert_script_dir = ./package/bin
usage_stat_app = stat
+ enable_detect_hotkey = false
[pegasus.clusters]
onebox = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index 23aa21a..26ff59d 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -23,6 +23,7 @@
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
#include "base/pegasus_key_schema.h"
+#include "base/pegasus_utils.h"
namespace pegasus {
namespace server {
@@ -30,13 +31,13 @@ namespace server {
DSN_DEFINE_uint32(
"pegasus.server",
hot_bucket_variance_threshold,
- 3,
+ 7,
"the variance threshold to detect hot bucket during coarse analysis of hotkey detection");
DSN_DEFINE_uint32(
"pegasus.server",
hot_key_variance_threshold,
- 3,
+ 5,
"the variance threshold to detect hot key during fine analysis of hotkey detection");
DSN_DEFINE_uint32("pegasus.server",
@@ -168,7 +169,8 @@ inline void hotkey_collector::change_state_by_result()
case hotkey_collector_state::FINE_DETECTING:
if (!_result.hot_hash_key.empty()) {
change_state_to_finished();
- derror_replica("Find the hotkey: {}", _result.hot_hash_key);
+ derror_replica("Find the hotkey: {}",
+ pegasus::utils::c_escape_string(_result.hot_hash_key));
}
break;
default:
@@ -239,25 +241,23 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
switch (now_state) {
case hotkey_collector_state::COARSE_DETECTING:
case hotkey_collector_state::FINE_DETECTING:
- resp.err = dsn::ERR_INVALID_STATE;
+ resp.err = dsn::ERR_BUSY;
hint = fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(now_state));
- dwarn_replica(hint);
- return;
+ break;
case hotkey_collector_state::FINISHED:
- resp.err = dsn::ERR_INVALID_STATE;
- hint = fmt::format(
- "{} hotkey result has been found, you can send a stop rpc to restart hotkey detection",
- dsn::enum_to_string(_hotkey_type));
- dwarn_replica(hint);
- return;
+ resp.err = dsn::ERR_BUSY;
+ hint = fmt::format("{} hotkey result has been found: {}, you can send a stop rpc to "
+ "restart hotkey detection",
+ dsn::enum_to_string(_hotkey_type),
+ pegasus::utils::c_escape_string(_result.hot_hash_key));
+ break;
case hotkey_collector_state::STOPPED:
change_state_to_coarse_detecting();
resp.err = dsn::ERR_OK;
hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
- ddebug_replica(hint);
- return;
+ break;
default:
hint = "invalid collector state";
resp.err = dsn::ERR_INVALID_STATE;
@@ -265,6 +265,8 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
derror_replica(hint);
dassert(false, "invalid collector state");
}
+ resp.__set_err_hint(hint);
+ dwarn_replica(hint);
}
void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
@@ -280,13 +282,13 @@ void hotkey_collector::query_result(dsn::replication::detect_hotkey_response &re
{
if (_state != hotkey_collector_state::FINISHED) {
resp.err = dsn::ERR_BUSY;
- std::string hint = fmt::format("hotkey is detecting now, now state: {}",
- dsn::enum_to_string(_hotkey_type));
+ std::string hint =
+ fmt::format("Can't get hotkey now, now state: {}", enum_to_string(_state.load()));
resp.__set_err_hint(hint);
ddebug_replica(hint);
} else {
resp.err = dsn::ERR_OK;
- resp.__set_hotkey_result(_result.hot_hash_key);
+ resp.__set_hotkey_result(pegasus::utils::c_escape_string(_result.hot_hash_key));
}
}
diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp
index 2bd6b9d..3b7086b 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -50,7 +50,7 @@ DSN_DEFINE_int32("pegasus.collector",
DSN_DEFINE_int32("pegasus.collector",
occurrence_threshold,
- 100,
+ 3,
"hot paritiotion occurrence times' threshold to send rpc to detect hotkey");
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
@@ -170,7 +170,7 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
}
}
-/*static*/ void hotspot_partition_calculator::send_detect_hotkey_request(
+void hotspot_partition_calculator::send_detect_hotkey_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::replication::hotkey_type::type hotkey_type,
@@ -178,8 +178,8 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
{
FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});
- int app_id;
- int partition_count;
+ int app_id = -1;
+ int partition_count = -1;
std::vector<dsn::partition_configuration> partitions;
_shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions);
diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h
index ddbbb15..dc2db45 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -40,7 +40,10 @@ public:
hotspot_partition_calculator(const std::string &app_name,
int partition_count,
std::shared_ptr<shell_context> context)
- : _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
+ : _app_name(app_name),
+ _hot_points(partition_count),
+ _shell_context(context),
+ _hotpartition_counter(partition_count)
{
init_perf_counter(partition_count);
}
diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp
index 645fa8a..d390f18 100644
--- a/src/server/test/hotspot_partition_test.cpp
+++ b/src/server/test/hotspot_partition_test.cpp
@@ -25,6 +25,7 @@ namespace pegasus {
namespace server {
DSN_DECLARE_int32(occurrence_threshold);
+DSN_DECLARE_bool(enable_detect_hotkey);
class hotspot_partition_test : public pegasus_server_test_base
{
@@ -33,8 +34,13 @@ public:
{
dsn::fail::setup();
dsn::fail::cfg("send_detect_hotkey_request", "return()");
+ FLAGS_enable_detect_hotkey = true;
};
- ~hotspot_partition_test() { dsn::fail::teardown(); }
+ ~hotspot_partition_test()
+ {
+ FLAGS_enable_detect_hotkey = false;
+ dsn::fail::teardown();
+ }
hotspot_partition_calculator calculator;
@@ -136,8 +142,8 @@ TEST_F(hotspot_partition_test, send_detect_hotkey_request)
expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold;
aggregate_analyse_data(test_rows, expect_result, FLAGS_occurrence_threshold);
const int back_to_normal = 30;
- expect_result[READ_HOT_PARTITION][0] = FLAGS_occurrence_threshold - back_to_normal;
- expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold - back_to_normal;
+ expect_result[READ_HOT_PARTITION][0] = 0;
+ expect_result[WRITE_HOT_PARTITION][1] = 0;
aggregate_analyse_data(generate_row_data(), expect_result, back_to_normal);
}
diff --git a/src/shell/commands/detect_hotkey.cpp b/src/shell/commands/detect_hotkey.cpp
index d2ffed7..fd4bbe1 100644
--- a/src/shell/commands/detect_hotkey.cpp
+++ b/src/shell/commands/detect_hotkey.cpp
@@ -40,6 +40,8 @@ bool generate_hotkey_request(dsn::replication::detect_hotkey_request &req,
req.action = dsn::replication::detect_action::START;
} else if (!strcasecmp(hotkey_action.c_str(), "stop")) {
req.action = dsn::replication::detect_action::STOP;
+ } else if (!strcasecmp(hotkey_type.c_str(), "query")) {
+ req.action = dsn::replication::detect_action::QUERY;
} else {
err_info =
fmt::format("\"{}\" is an invalid hotkey detect action (should be 'start' or 'stop')\n",
diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh
index 3aa5d92..334b590 100755
--- a/src/test/function_test/run.sh
+++ b/src/test/function_test/run.sh
@@ -71,4 +71,6 @@ if [ $on_travis == "NO" ]; then
exit_if_fail $? "run test recovery failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/bulk_load.xml" GTEST_FILTER="bulk_load_test.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test bulk load failed: $test_case $config_file $table_name"
+ GTEST_OUTPUT="xml:$REPORT_DIR/test_detect_hotspot.xml" GTEST_FILTER="test_detect_hotspot.*" ./$test_case $config_file $table_name
+ exit_if_fail $? "run test test_detect_hotspot load failed: $test_case $config_file $table_name"
fi
diff --git a/src/test/function_test/test_detect_hotspot.cpp b/src/test/function_test/test_detect_hotspot.cpp
new file mode 100644
index 0000000..d60b90a
--- /dev/null
+++ b/src/test/function_test/test_detect_hotspot.cpp
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <libgen.h>
+
+#include <dsn/utility/filesystem.h>
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <pegasus/client.h>
+#include <gtest/gtest.h>
+#include <boost/lexical_cast.hpp>
+#include <dsn/utility/rand.h>
+
+#include "base/pegasus_const.h"
+#include "global_env.h"
+
+using namespace ::dsn;
+using namespace ::dsn::replication;
+using namespace pegasus;
+
+static std::string generate_hash_key_by_random(bool is_hotkey, int probability = 100)
+{
+ if (is_hotkey && (dsn::rand::next_u32(100) < probability)) {
+ return "ThisisahotkeyThisisahotkey";
+ }
+ static const std::string chars("abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "1234567890"
+ "!@#$%^&*()"
+ "`~-_=+[{]{\\|;:'\",<.>/? ");
+ std::string result;
+ for (int i = 0; i < 20; i++) {
+ result += chars[dsn::rand::next_u32(chars.size())];
+ }
+ return result;
+}
+
+enum detection_type
+{
+ read_data,
+ write_data
+};
+enum key_type
+{
+ random_dataset,
+ hotspot_dataset
+};
+
+class test_detect_hotspot : public testing::Test
+{
+public:
+ virtual void SetUp() override
+ {
+ chdir(global_env::instance()._pegasus_root.c_str());
+ system("pwd");
+ system("./run.sh clear_onebox");
+ system("cp src/server/config.min.ini config-server-test-hotspot.ini");
+ system("sed -i \"/^\\s*enable_detect_hotkey/c enable_detect_hotkey = "
+ "true\" config-server-test-hotspot.ini");
+ system("./run.sh start_onebox -c -w --config_path config-server-test-hotspot.ini");
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+
+ std::vector<dsn::rpc_address> meta_list;
+ replica_helper::load_meta_servers(
+ meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), "single_master_cluster");
+
+ ddl_client = std::make_shared<replication_ddl_client>(meta_list);
+ pg_client =
+ pegasus::pegasus_client_factory::get_client("single_master_cluster", app_name.c_str());
+
+ auto err = ddl_client->create_app(app_name.c_str(), "pegasus", 8, 3, {}, false);
+ ASSERT_EQ(dsn::ERR_OK, err);
+
+ ddl_client->list_app(app_name, app_id, partition_count, partitions);
+ }
+
+ virtual void TearDown() override
+ {
+ chdir(global_env::instance()._pegasus_root.c_str());
+ system("./run.sh clear_onebox");
+ system("./run.sh start_onebox -w");
+ chdir(global_env::instance()._working_dir.c_str());
+ }
+
+ void generate_dataset(int64_t time_duration, detection_type dt, key_type kt)
+ {
+ int64_t start = dsn_now_s();
+ int err = PERR_OK;
+ ASSERT_NE(pg_client, nullptr);
+
+ for (int i = 0; dsn_now_s() - start < time_duration; ++i %= 1000) {
+ std::string index = std::to_string(i);
+ std::string h_key = generate_hash_key_by_random(kt, 50);
+ std::string s_key = "sortkey_" + index;
+ std::string value = "value_" + index;
+ if (dt == detection_type::write_data) {
+ err = pg_client->set(h_key, s_key, value);
+ ASSERT_EQ(err, PERR_OK);
+ } else {
+ err = pg_client->get(h_key, s_key, value);
+ ASSERT_TRUE((err == PERR_OK) || err == (PERR_NOT_FOUND));
+ }
+ }
+ }
+
+ void get_result(detection_type dt, key_type expect_hotspot)
+ {
+ if (dt == detection_type::write_data) {
+ req.type = dsn::replication::hotkey_type::type::WRITE;
+ } else {
+ req.type = dsn::replication::hotkey_type::type::READ;
+ }
+ req.action = dsn::replication::detect_action::QUERY;
+
+ bool find_hotkey = false;
+ int partition_index;
+ for (partition_index = 0; partition_index < partitions.size(); partition_index++) {
+ req.pid = dsn::gpid(app_id, partition_index);
+ auto errinfo =
+ ddl_client->detect_hotkey(partitions[partition_index].primary, req, resp);
+ ASSERT_EQ(errinfo, dsn::ERR_OK);
+ if (!resp.hotkey_result.empty()) {
+ find_hotkey = true;
+ break;
+ }
+ }
+ if (expect_hotspot == key_type::hotspot_dataset) {
+ ASSERT_TRUE(find_hotkey);
+ ASSERT_EQ(resp.err, dsn::ERR_OK);
+ ASSERT_EQ(resp.hotkey_result, "ThisisahotkeyThisisahotkey");
+ } else {
+ ASSERT_FALSE(find_hotkey);
+ }
+
+ // Wait for collector sending the next start detecting command
+ sleep(15);
+
+ req.action = dsn::replication::detect_action::STOP;
+ for (partition_index = 0; partition_index < partitions.size(); partition_index++) {
+ auto errinfo =
+ ddl_client->detect_hotkey(partitions[partition_index].primary, req, resp);
+ ASSERT_EQ(errinfo, dsn::ERR_OK);
+ ASSERT_EQ(resp.err, dsn::ERR_OK);
+ }
+
+ req.action = dsn::replication::detect_action::QUERY;
+ for (partition_index = 0; partition_index < partitions.size(); partition_index++) {
+ req.pid = dsn::gpid(app_id, partition_index);
+ auto errinfo =
+ ddl_client->detect_hotkey(partitions[partition_index].primary, req, resp);
+ ASSERT_EQ(errinfo, dsn::ERR_OK);
+ ASSERT_EQ(resp.err_hint,
+ "Can't get hotkey now, now state: hotkey_collector_state::STOPPED");
+ }
+ }
+
+ void write_hotspot_data()
+ {
+ generate_dataset(warmup_second, detection_type::write_data, key_type::random_dataset);
+ generate_dataset(
+ max_detection_second, detection_type::write_data, key_type::hotspot_dataset);
+ get_result(detection_type::write_data, key_type::hotspot_dataset);
+ }
+
+ void write_random_data()
+ {
+ generate_dataset(
+ max_detection_second, detection_type::write_data, key_type::random_dataset);
+ get_result(detection_type::write_data, key_type::random_dataset);
+ }
+
+ void capture_until_maxtime()
+ {
+ int target_partition = 2;
+ req.type = dsn::replication::hotkey_type::type::WRITE;
+ req.action = dsn::replication::detect_action::START;
+
+ req.pid = dsn::gpid(app_id, target_partition);
+ auto errinfo = ddl_client->detect_hotkey(partitions[target_partition].primary, req, resp);
+ ASSERT_EQ(errinfo, dsn::ERR_OK);
+ ASSERT_EQ(resp.err, dsn::ERR_OK);
+
+ req.action = dsn::replication::detect_action::QUERY;
+ errinfo = ddl_client->detect_hotkey(partitions[target_partition].primary, req, resp);
+ ASSERT_EQ(resp.err_hint,
+ "Can't get hotkey now, now state: hotkey_collector_state::COARSE_DETECTING");
+
+ // max_detection_second > max_seconds_to_detect_hotkey
+ int max_seconds_to_detect_hotkey = 160;
+ generate_dataset(
+ max_seconds_to_detect_hotkey, detection_type::write_data, key_type::random_dataset);
+
+ req.action = dsn::replication::detect_action::QUERY;
+ errinfo = ddl_client->detect_hotkey(partitions[target_partition].primary, req, resp);
+ ASSERT_EQ(resp.err_hint,
+ "Can't get hotkey now, now state: hotkey_collector_state::STOPPED");
+ }
+
+ void read_hotspot_data()
+ {
+ generate_dataset(warmup_second, detection_type::read_data, key_type::hotspot_dataset);
+ generate_dataset(
+ max_detection_second, detection_type::read_data, key_type::hotspot_dataset);
+ get_result(detection_type::read_data, key_type::hotspot_dataset);
+ }
+
+ void read_random_data()
+ {
+ generate_dataset(max_detection_second, detection_type::read_data, key_type::random_dataset);
+ get_result(detection_type::read_data, key_type::random_dataset);
+ }
+
+ const std::string app_name = "hotspot_test";
+ const int64_t max_detection_second = 100;
+ const int64_t warmup_second = 30;
+ int32_t app_id;
+ int32_t partition_count;
+ std::vector<dsn::partition_configuration> partitions;
+ dsn::replication::detect_hotkey_response resp;
+ dsn::replication::detect_hotkey_request req;
+ std::shared_ptr<replication_ddl_client> ddl_client;
+ pegasus::pegasus_client *pg_client;
+};
+
+TEST_F(test_detect_hotspot, write_hotspot_data)
+{
+ std::cout << "start testing write hotspot data..." << std::endl;
+ write_hotspot_data();
+ std::cout << "write hotspot data passed....." << std::endl;
+ std::cout << "start testing write random data..." << std::endl;
+ write_random_data();
+ std::cout << "write random data passed....." << std::endl;
+ std::cout << "start testing max detection time..." << std::endl;
+ capture_until_maxtime();
+ std::cout << "max detection time passed....." << std::endl;
+ std::cout << "start testing read hotspot data..." << std::endl;
+ read_hotspot_data();
+ std::cout << "read hotspot data passed....." << std::endl;
+ std::cout << "start testing read random data..." << std::endl;
+ read_random_data();
+ std::cout << "read random data passed....." << std::endl;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org