You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2020/09/22 14:55:02 UTC
[incubator-pegasus] branch master updated: feat(hotspot):
calculator auto detect hotkey in the hot partition (#604)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new a1f158b feat(hotspot): calculator auto detect hotkey in the hot partition (#604)
a1f158b is described below
commit a1f158b38ba7b82e572efee41bae93f4846913a4
Author: Smilencer <52...@qq.com>
AuthorDate: Tue Sep 22 09:53:51 2020 -0500
feat(hotspot): calculator auto detect hotkey in the hot partition (#604)
---
src/server/hotspot_partition_calculator.cpp | 48 ++++++++++++++++++++++-
src/server/hotspot_partition_calculator.h | 13 +++++-
src/server/test/config.ini | 3 ++
src/server/test/hotspot_partition_test.cpp | 61 +++++++++++++++++++++++++----
4 files changed, 115 insertions(+), 10 deletions(-)
diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp
index 84a14e7..e0618b3 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -29,6 +29,7 @@
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/tool-api/task_tracker.h>
#include "pegasus_read_service.h"
+#include <dsn/utility/fail_point.h>
namespace pegasus {
namespace server {
@@ -42,6 +43,22 @@ DSN_DEFINE_int64("pegasus.collector",
"eliminate outdated historical "
"data");
+DSN_DEFINE_bool("pegasus.collector",
+ enable_hotkey_detect,
+ false,
+ "auto detect hot key in the hot paritition");
+
+DSN_DEFINE_int32("pegasus.collector",
+ hot_partition_threshold,
+ 3,
+ "threshold of hotspot partition value, if app.stat.hotspots >= "
+ "FLAGS_hotpartition_threshold, this partition is a hot partition");
+
+DSN_DEFINE_int32("pegasus.collector",
+ occurrence_threshold,
+ 100,
+ "hot paritiotion occurrence times' threshold to send rpc to detect hotkey");
+
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
{
while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
@@ -128,15 +145,44 @@ void hotspot_partition_calculator::data_analyse()
stat_histories_analyse(data_type, hot_points);
update_hot_point(data_type, hot_points);
}
+ if (!FLAGS_enable_hotkey_detect) {
+ return;
+ }
+ for (int data_type = 0; data_type <= 1; data_type++) {
+ detect_hotkey_in_hotpartition(data_type);
+ }
+}
+
+void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
+{
+ for (int index = 0; index < _hot_points.size(); index++) {
+ if (_hot_points[index][data_type].get()->get_value() >= FLAGS_hot_partition_threshold) {
+ if (++_hotpartition_counter[index][data_type] >= FLAGS_occurrence_threshold) {
+ derror_f("Find a {} hot partition {}.{}",
+ (data_type == partition_qps_type::READ_HOTSPOT_DATA ? "read" : "write"),
+ _app_name,
+ index);
+ send_hotkey_detect_request(_app_name,
+ index,
+ (data_type == dsn::apps::hotkey_type::type::READ)
+ ? dsn::apps::hotkey_type::type::READ
+ : dsn::apps::hotkey_type::type::WRITE,
+ dsn::apps::hotkey_detect_action::type::START);
+ }
+ } else {
+ _hotpartition_counter[index][data_type] =
+ std::max(_hotpartition_counter[index][data_type] - 1, 0);
+ }
+ }
}
-// TODO:(TangYanzhao) call this function to start hotkey detection
/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::apps::hotkey_type::type hotkey_type,
const dsn::apps::hotkey_detect_action::type action)
{
+ FAIL_POINT_INJECT_F("send_hotkey_detect_request", [](dsn::string_view) {});
auto request = std::make_unique<dsn::apps::hotkey_detect_request>();
request->type = hotkey_type;
request->action = action;
diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h
index 2b23ee9..e95dca8 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -17,9 +17,11 @@
#pragma once
-#include "hotspot_partition_stat.h"
#include <gtest/gtest_prod.h>
+
#include <dsn/perf_counter/perf_counter.h>
+#include <dsn/utility/flags.h>
+#include "hotspot_partition_stat.h"
namespace pegasus {
namespace server {
@@ -36,7 +38,7 @@ class hotspot_partition_calculator
{
public:
hotspot_partition_calculator(const std::string &app_name, int partition_count)
- : _app_name(app_name), _hot_points(partition_count)
+ : _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
{
init_perf_counter(partition_count);
}
@@ -55,6 +57,7 @@ private:
void stat_histories_analyse(int data_type, std::vector<int> &hot_points);
// set hot_point to corresponding perf_counter
void update_hot_point(int data_type, std::vector<int> &hot_points);
+ void detect_hotkey_in_hotpartition(int data_type);
const std::string _app_name;
void init_perf_counter(int perf_counter_count);
@@ -63,6 +66,12 @@ private:
// saving historical data can improve accuracy
stat_histories _partitions_stat_histories;
+ // _hotpartition_counter p[index_of_partitions][type_of_read(0)/write(1)_stat]
+ // it's a counter to find partitions that often exceed the threshold
+ // If the hot_point of some partitions are always high, calculator will send a RPC to detect
+ // hotkey on the replica automatically
+ std::vector<std::array<int, 2>> _hotpartition_counter;
+
friend class hotspot_partition_test;
};
diff --git a/src/server/test/config.ini b/src/server/test/config.ini
index 5d8084e..f08adb1 100644
--- a/src/server/test/config.ini
+++ b/src/server/test/config.ini
@@ -509,3 +509,6 @@ onebox2 = 2
[pegasus.clusters]
onebox = 0.0.0.0:34701
onebox2 = 0.0.0.0:35701
+
+[pegasus.collector]
+enable_hotkey_detect = true
diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp
index f05a91f..34e74f4 100644
--- a/src/server/test/hotspot_partition_test.cpp
+++ b/src/server/test/hotspot_partition_test.cpp
@@ -19,15 +19,25 @@
#include "pegasus_server_test_base.h"
#include <gtest/gtest.h>
+#include <dsn/utility/fail_point.h>
namespace pegasus {
namespace server {
+DSN_DECLARE_int32(occurrence_threshold);
+
class hotspot_partition_test : public pegasus_server_test_base
{
public:
- hotspot_partition_test() : calculator("TEST", 8){};
+ hotspot_partition_test() : calculator("TEST", 8)
+ {
+ dsn::fail::setup();
+ dsn::fail::cfg("send_hotkey_detect_request", "return()");
+ };
+ ~hotspot_partition_test() { dsn::fail::teardown(); }
+
hotspot_partition_calculator calculator;
+
std::vector<row_data> generate_row_data()
{
std::vector<row_data> test_rows;
@@ -38,6 +48,12 @@ public:
}
return test_rows;
}
+
+ std::vector<std::array<int, 2>> generate_result()
+ {
+ return {{0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}};
+ }
+
std::vector<std::vector<double>> get_calculator_result(const hot_partition_counters &counters)
{
std::vector<std::vector<double>> result;
@@ -49,15 +65,28 @@ public:
}
return result;
}
+
void test_policy_in_scenarios(std::vector<row_data> scenario,
- std::vector<std::vector<double>> &expect_result,
- hotspot_partition_calculator &calculator)
+ std::vector<std::vector<double>> &expect_result)
{
calculator.data_aggregate(std::move(scenario));
calculator.data_analyse();
std::vector<std::vector<double>> result = get_calculator_result(calculator._hot_points);
ASSERT_EQ(result, expect_result);
}
+
+ void aggregate_analyse_data(std::vector<row_data> scenario,
+ std::vector<std::array<int, 2>> &expect_result,
+ int loop_times)
+ {
+ for (int i = 0; i < loop_times; i++) {
+ calculator.data_aggregate(scenario);
+ calculator.data_analyse();
+ }
+ ASSERT_EQ(calculator._hotpartition_counter, expect_result);
+ }
+
+ void clear_calculator_histories() { calculator._partitions_stat_histories.clear(); }
};
TEST_F(hotspot_partition_test, hotspot_partition_policy)
@@ -66,7 +95,7 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
std::vector<row_data> test_rows = generate_row_data();
std::vector<std::vector<double>> expect_vector = {{0, 0, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 0, 0, 0}};
- test_policy_in_scenarios(test_rows, expect_vector, calculator);
+ test_policy_in_scenarios(test_rows, expect_vector);
// Insert hotspot scenario_0 data to test
test_rows = generate_row_data();
@@ -75,14 +104,14 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
- test_policy_in_scenarios(test_rows, expect_vector, calculator);
+ test_policy_in_scenarios(test_rows, expect_vector);
// Insert hotspot scenario_0 data to test again
test_rows = generate_row_data();
test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
- test_policy_in_scenarios(test_rows, expect_vector, calculator);
+ test_policy_in_scenarios(test_rows, expect_vector);
// Insert hotspot scenario_1 data to test again
test_rows = generate_row_data();
@@ -91,7 +120,25 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
test_rows[HOT_SCENARIO_1_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_1_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 4, 0, 0, 0, 0}, {0, 0, 4, 0, 0, 0, 0, 0}};
- test_policy_in_scenarios(test_rows, expect_vector, calculator);
+ test_policy_in_scenarios(test_rows, expect_vector);
+ clear_calculator_histories();
+}
+
+TEST_F(hotspot_partition_test, send_hotkey_detect_request)
+{
+ const int READ_HOT_PARTITION = 7;
+ const int WRITE_HOT_PARTITION = 0;
+ std::vector<row_data> test_rows = generate_row_data();
+ test_rows[READ_HOT_PARTITION].get_qps = 5000.0;
+ test_rows[WRITE_HOT_PARTITION].put_qps = 5000.0;
+ auto expect_result = generate_result();
+ expect_result[READ_HOT_PARTITION][0] = FLAGS_occurrence_threshold;
+ expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold;
+ aggregate_analyse_data(test_rows, expect_result, FLAGS_occurrence_threshold);
+ const int back_to_normal = 30;
+ expect_result[READ_HOT_PARTITION][0] = FLAGS_occurrence_threshold - back_to_normal;
+ expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold - back_to_normal;
+ aggregate_analyse_data(generate_row_data(), expect_result, back_to_normal);
}
} // namespace server
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org