You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2020/09/22 14:55:02 UTC

[incubator-pegasus] branch master updated: feat(hotspot): calculator auto detect hotkey in the hot partition (#604)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new a1f158b  feat(hotspot): calculator auto detect hotkey in the hot partition (#604)
a1f158b is described below

commit a1f158b38ba7b82e572efee41bae93f4846913a4
Author: Smilencer <52...@qq.com>
AuthorDate: Tue Sep 22 09:53:51 2020 -0500

    feat(hotspot): calculator auto detect hotkey in the hot partition (#604)
---
 src/server/hotspot_partition_calculator.cpp | 48 ++++++++++++++++++++++-
 src/server/hotspot_partition_calculator.h   | 13 +++++-
 src/server/test/config.ini                  |  3 ++
 src/server/test/hotspot_partition_test.cpp  | 61 +++++++++++++++++++++++++----
 4 files changed, 115 insertions(+), 10 deletions(-)

diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp
index 84a14e7..e0618b3 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -29,6 +29,7 @@
 #include <dsn/dist/replication/duplication_common.h>
 #include <dsn/tool-api/task_tracker.h>
 #include "pegasus_read_service.h"
+#include <dsn/utility/fail_point.h>
 
 namespace pegasus {
 namespace server {
@@ -42,6 +43,22 @@ DSN_DEFINE_int64("pegasus.collector",
                  "eliminate outdated historical "
                  "data");
 
+DSN_DEFINE_bool("pegasus.collector",
+                enable_hotkey_detect,
+                false,
+                "auto detect hot key in the hot paritition");
+
+DSN_DEFINE_int32("pegasus.collector",
+                 hot_partition_threshold,
+                 3,
+                 "threshold of hotspot partition value, if app.stat.hotspots >= "
+                 "FLAGS_hotpartition_threshold, this partition is a hot partition");
+
+DSN_DEFINE_int32("pegasus.collector",
+                 occurrence_threshold,
+                 100,
+                 "hot paritiotion occurrence times' threshold to send rpc to detect hotkey");
+
 void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
 {
     while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
@@ -128,15 +145,44 @@ void hotspot_partition_calculator::data_analyse()
         stat_histories_analyse(data_type, hot_points);
         update_hot_point(data_type, hot_points);
     }
+    if (!FLAGS_enable_hotkey_detect) {
+        return;
+    }
+    for (int data_type = 0; data_type <= 1; data_type++) {
+        detect_hotkey_in_hotpartition(data_type);
+    }
+}
+
+void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
+{
+    for (int index = 0; index < _hot_points.size(); index++) {
+        if (_hot_points[index][data_type].get()->get_value() >= FLAGS_hot_partition_threshold) {
+            if (++_hotpartition_counter[index][data_type] >= FLAGS_occurrence_threshold) {
+                derror_f("Find a {} hot partition {}.{}",
+                         (data_type == partition_qps_type::READ_HOTSPOT_DATA ? "read" : "write"),
+                         _app_name,
+                         index);
+                send_hotkey_detect_request(_app_name,
+                                           index,
+                                           (data_type == dsn::apps::hotkey_type::type::READ)
+                                               ? dsn::apps::hotkey_type::type::READ
+                                               : dsn::apps::hotkey_type::type::WRITE,
+                                           dsn::apps::hotkey_detect_action::type::START);
+            }
+        } else {
+            _hotpartition_counter[index][data_type] =
+                std::max(_hotpartition_counter[index][data_type] - 1, 0);
+        }
+    }
 }
 
-// TODO:(TangYanzhao) call this function to start hotkey detection
 /*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
     const std::string &app_name,
     const uint64_t partition_index,
     const dsn::apps::hotkey_type::type hotkey_type,
     const dsn::apps::hotkey_detect_action::type action)
 {
+    FAIL_POINT_INJECT_F("send_hotkey_detect_request", [](dsn::string_view) {});
     auto request = std::make_unique<dsn::apps::hotkey_detect_request>();
     request->type = hotkey_type;
     request->action = action;
diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h
index 2b23ee9..e95dca8 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -17,9 +17,11 @@
 
 #pragma once
 
-#include "hotspot_partition_stat.h"
 #include <gtest/gtest_prod.h>
+
 #include <dsn/perf_counter/perf_counter.h>
+#include <dsn/utility/flags.h>
+#include "hotspot_partition_stat.h"
 
 namespace pegasus {
 namespace server {
@@ -36,7 +38,7 @@ class hotspot_partition_calculator
 {
 public:
     hotspot_partition_calculator(const std::string &app_name, int partition_count)
-        : _app_name(app_name), _hot_points(partition_count)
+        : _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
     {
         init_perf_counter(partition_count);
     }
@@ -55,6 +57,7 @@ private:
     void stat_histories_analyse(int data_type, std::vector<int> &hot_points);
     // set hot_point to corresponding perf_counter
     void update_hot_point(int data_type, std::vector<int> &hot_points);
+    void detect_hotkey_in_hotpartition(int data_type);
 
     const std::string _app_name;
     void init_perf_counter(int perf_counter_count);
@@ -63,6 +66,12 @@ private:
     // saving historical data can improve accuracy
     stat_histories _partitions_stat_histories;
 
+    // _hotpartition_counter p[index_of_partitions][type_of_read(0)/write(1)_stat]
+    // it's a counter to find partitions that often exceed the threshold
+    // If the hot_point of some partitions are always high, calculator will send a RPC to detect
+    // hotkey on the replica automatically
+    std::vector<std::array<int, 2>> _hotpartition_counter;
+
     friend class hotspot_partition_test;
 };
 
diff --git a/src/server/test/config.ini b/src/server/test/config.ini
index 5d8084e..f08adb1 100644
--- a/src/server/test/config.ini
+++ b/src/server/test/config.ini
@@ -509,3 +509,6 @@ onebox2 = 2
 [pegasus.clusters]
 onebox = 0.0.0.0:34701
 onebox2 = 0.0.0.0:35701
+
+[pegasus.collector]
+enable_hotkey_detect = true
diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp
index f05a91f..34e74f4 100644
--- a/src/server/test/hotspot_partition_test.cpp
+++ b/src/server/test/hotspot_partition_test.cpp
@@ -19,15 +19,25 @@
 
 #include "pegasus_server_test_base.h"
 #include <gtest/gtest.h>
+#include <dsn/utility/fail_point.h>
 
 namespace pegasus {
 namespace server {
 
+DSN_DECLARE_int32(occurrence_threshold);
+
 class hotspot_partition_test : public pegasus_server_test_base
 {
 public:
-    hotspot_partition_test() : calculator("TEST", 8){};
+    hotspot_partition_test() : calculator("TEST", 8)
+    {
+        dsn::fail::setup();
+        dsn::fail::cfg("send_hotkey_detect_request", "return()");
+    };
+    ~hotspot_partition_test() { dsn::fail::teardown(); }
+
     hotspot_partition_calculator calculator;
+
     std::vector<row_data> generate_row_data()
     {
         std::vector<row_data> test_rows;
@@ -38,6 +48,12 @@ public:
         }
         return test_rows;
     }
+
+    std::vector<std::array<int, 2>> generate_result()
+    {
+        return {{0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}, {0, 0}};
+    }
+
     std::vector<std::vector<double>> get_calculator_result(const hot_partition_counters &counters)
     {
         std::vector<std::vector<double>> result;
@@ -49,15 +65,28 @@ public:
         }
         return result;
     }
+
     void test_policy_in_scenarios(std::vector<row_data> scenario,
-                                  std::vector<std::vector<double>> &expect_result,
-                                  hotspot_partition_calculator &calculator)
+                                  std::vector<std::vector<double>> &expect_result)
     {
         calculator.data_aggregate(std::move(scenario));
         calculator.data_analyse();
         std::vector<std::vector<double>> result = get_calculator_result(calculator._hot_points);
         ASSERT_EQ(result, expect_result);
     }
+
+    void aggregate_analyse_data(std::vector<row_data> scenario,
+                                std::vector<std::array<int, 2>> &expect_result,
+                                int loop_times)
+    {
+        for (int i = 0; i < loop_times; i++) {
+            calculator.data_aggregate(scenario);
+            calculator.data_analyse();
+        }
+        ASSERT_EQ(calculator._hotpartition_counter, expect_result);
+    }
+
+    void clear_calculator_histories() { calculator._partitions_stat_histories.clear(); }
 };
 
 TEST_F(hotspot_partition_test, hotspot_partition_policy)
@@ -66,7 +95,7 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
     std::vector<row_data> test_rows = generate_row_data();
     std::vector<std::vector<double>> expect_vector = {{0, 0, 0, 0, 0, 0, 0, 0},
                                                       {0, 0, 0, 0, 0, 0, 0, 0}};
-    test_policy_in_scenarios(test_rows, expect_vector, calculator);
+    test_policy_in_scenarios(test_rows, expect_vector);
 
     // Insert hotspot scenario_0 data to test
     test_rows = generate_row_data();
@@ -75,14 +104,14 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
     test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
     test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
     expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
-    test_policy_in_scenarios(test_rows, expect_vector, calculator);
+    test_policy_in_scenarios(test_rows, expect_vector);
 
     // Insert hotspot scenario_0 data to test again
     test_rows = generate_row_data();
     test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
     test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
     expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
-    test_policy_in_scenarios(test_rows, expect_vector, calculator);
+    test_policy_in_scenarios(test_rows, expect_vector);
 
     // Insert hotspot scenario_1 data to test again
     test_rows = generate_row_data();
@@ -91,7 +120,25 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
     test_rows[HOT_SCENARIO_1_READ_HOT_PARTITION].get_qps = 5000.0;
     test_rows[HOT_SCENARIO_1_WRITE_HOT_PARTITION].put_qps = 5000.0;
     expect_vector = {{0, 0, 0, 4, 0, 0, 0, 0}, {0, 0, 4, 0, 0, 0, 0, 0}};
-    test_policy_in_scenarios(test_rows, expect_vector, calculator);
+    test_policy_in_scenarios(test_rows, expect_vector);
+    clear_calculator_histories();
+}
+
+TEST_F(hotspot_partition_test, send_hotkey_detect_request)
+{
+    const int READ_HOT_PARTITION = 7;
+    const int WRITE_HOT_PARTITION = 0;
+    std::vector<row_data> test_rows = generate_row_data();
+    test_rows[READ_HOT_PARTITION].get_qps = 5000.0;
+    test_rows[WRITE_HOT_PARTITION].put_qps = 5000.0;
+    auto expect_result = generate_result();
+    expect_result[READ_HOT_PARTITION][0] = FLAGS_occurrence_threshold;
+    expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold;
+    aggregate_analyse_data(test_rows, expect_result, FLAGS_occurrence_threshold);
+    const int back_to_normal = 30;
+    expect_result[READ_HOT_PARTITION][0] = FLAGS_occurrence_threshold - back_to_normal;
+    expect_result[WRITE_HOT_PARTITION][1] = FLAGS_occurrence_threshold - back_to_normal;
+    aggregate_analyse_data(generate_row_data(), expect_result, back_to_normal);
 }
 
 } // namespace server


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org