You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2020/11/19 06:02:43 UTC

[incubator-pegasus] branch master updated: feat(hotkey): add unit test of internal_collector (#639)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 737d2ef  feat(hotkey): add unit test of internal_collector (#639)
737d2ef is described below

commit 737d2eff55cbf0280f6afb1e7533288e5fda41c5
Author: Smilencer <52...@qq.com>
AuthorDate: Thu Nov 19 14:00:36 2020 +0800

    feat(hotkey): add unit test of internal_collector (#639)
---
 rdsn                                      |   2 +-
 src/server/hotkey_collector.cpp           |   5 +-
 src/server/hotkey_collector.h             |   9 ++
 src/server/test/config.ini                |   3 +
 src/server/test/hotkey_collector_test.cpp | 200 ++++++++++++++++++++++++++++++
 5 files changed, 215 insertions(+), 4 deletions(-)

diff --git a/rdsn b/rdsn
index ec47b0c..501fe55 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit ec47b0cfd8cbdc30b69b1d252e59588afb3f0b51
+Subproject commit 501fe552f960b6afa1e1dd4b5d1bf33c87379470
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index c326e72..892e6eb 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -19,7 +19,6 @@
 
 #include <dsn/dist/replication/replication_enums.h>
 #include <dsn/utility/smart_pointers.h>
-#include <dsn/utility/flags.h>
 #include <boost/functional/hash.hpp>
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/utility/flags.h>
@@ -66,7 +65,7 @@ DSN_DEFINE_int32(
     "the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
 
 // 68–95–99.7 rule, same algorithm as hotspot_partition_calculator::stat_histories_analyse
-static bool
+/*extern*/ bool
 find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, int &hot_index)
 {
     dcheck_gt(captured_keys.size(), 2);
@@ -102,7 +101,7 @@ find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, in
 }
 
 // TODO: (Tangyanzhao) replace it to xxhash
-static int get_bucket_id(dsn::string_view data)
+/*extern*/ int get_bucket_id(dsn::string_view data)
 {
     size_t hash_value = boost::hash_range(data.begin(), data.end());
     return static_cast<int>(hash_value % FLAGS_hotkey_buckets_num);
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index 9d6c3f1..066c7df 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -33,6 +33,10 @@ struct detect_hotkey_result
     std::string hot_hash_key;
 };
 
+extern int get_bucket_id(dsn::string_view data);
+extern bool
+find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, int &hot_index);
+
 //    hotkey_collector is responsible to find the hot keys after the partition
 //    was detected to be hot. The two types of hotkey, READ & WRITE, are detected
 //    separately.
@@ -96,6 +100,7 @@ private:
     uint64_t _collector_start_time_second;
 };
 
+// Be sure every function in internal_collector_base should be thread safe
 class internal_collector_base : public dsn::replication::replica_base
 {
 public:
@@ -128,6 +133,8 @@ private:
     hotkey_coarse_data_collector() = delete;
 
     std::vector<std::atomic<uint64_t>> _hash_buckets;
+
+    friend class coarse_collector_test;
 };
 
 class hotkey_fine_data_collector : public internal_collector_base
@@ -145,6 +152,8 @@ private:
     const uint32_t _target_bucket_index;
     // ConcurrentQueue is a lock-free queue to capture keys
     moodycamel::ConcurrentQueue<std::pair<dsn::blob, uint64_t>> _capture_key_queue;
+
+    friend class fine_collector_test;
 };
 
 } // namespace server
diff --git a/src/server/test/config.ini b/src/server/test/config.ini
index cc034aa..ab557e7 100644
--- a/src/server/test/config.ini
+++ b/src/server/test/config.ini
@@ -205,6 +205,9 @@ falcon_host = 127.0.0.1
 falcon_port = 1988
 falcon_path = /v1/push
 
+hot_bucket_variance_threshold = 5
+hot_key_variance_threshold = 5
+
 [components.pegasus_perf_counter_number_percentile_atomic]
 counter_computation_interval_seconds = 10
 
diff --git a/src/server/test/hotkey_collector_test.cpp b/src/server/test/hotkey_collector_test.cpp
new file mode 100644
index 0000000..7c98ed9
--- /dev/null
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "server/hotkey_collector.h"
+
+#include <dsn/utility/rand.h>
+#include <dsn/utility/flags.h>
+#include <dsn/tool-api/task_tracker.h>
+#include "pegasus_server_test_base.h"
+
+namespace pegasus {
+namespace server {
+
+DSN_DECLARE_uint32(hotkey_buckets_num);
+
+static std::string generate_hash_key_by_random(bool is_hotkey, int probability = 100)
+{
+    if (is_hotkey && (dsn::rand::next_u32(100) < probability)) {
+        return "ThisisahotkeyThisisahotkey";
+    }
+    static const std::string chars("abcdefghijklmnopqrstuvwxyz"
+                                   "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                                   "1234567890"
+                                   "!@#$%^&*()"
+                                   "`~-_=+[{]{\\|;:'\",<.>/? ");
+    std::string result;
+    for (int i = 0; i < 20; i++) {
+        result += chars[dsn::rand::next_u32(chars.size())];
+    }
+    return result;
+}
+
+TEST(hotkey_collector_test, get_bucket_id_test)
+{
+    int bucket_id = -1;
+    for (int i = 0; i < 1000000; i++) {
+        bucket_id = get_bucket_id(dsn::blob::create_from_bytes(generate_hash_key_by_random(false)));
+        ASSERT_GE(bucket_id, 0);
+        ASSERT_LT(bucket_id, FLAGS_hotkey_buckets_num);
+    }
+}
+
+TEST(hotkey_collector_test, find_outlier_index_test)
+{
+    int threshold = 3;
+    int hot_index;
+    bool hot_index_found;
+
+    hot_index_found = find_outlier_index({1, 2, 3}, threshold, hot_index);
+    ASSERT_EQ(hot_index_found, false);
+    ASSERT_EQ(hot_index, -1);
+
+    hot_index_found = find_outlier_index({1, 2, 100000}, threshold, hot_index);
+    ASSERT_EQ(hot_index_found, true);
+    ASSERT_EQ(hot_index, 2);
+
+    hot_index_found = find_outlier_index({1, 10000, 2, 3, 4, 10000000, 6}, threshold, hot_index);
+    ASSERT_EQ(hot_index_found, true);
+    ASSERT_EQ(hot_index, 5);
+
+    hot_index_found = find_outlier_index(
+        {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, threshold, hot_index);
+    ASSERT_EQ(hot_index_found, false);
+    ASSERT_EQ(hot_index, -1);
+}
+
+class coarse_collector_test : public pegasus_server_test_base
+{
+public:
+    coarse_collector_test() : coarse_collector(_server.get()){};
+
+    hotkey_coarse_data_collector coarse_collector;
+
+    bool empty()
+    {
+        int empty = true;
+        for (const auto &iter : coarse_collector._hash_buckets) {
+            if (iter.load() != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    dsn::task_tracker _tracker;
+};
+
+TEST_F(coarse_collector_test, coarse_collector)
+{
+    detect_hotkey_result result;
+
+    for (int i = 0; i < 1000; i++) {
+        dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
+            dsn::blob hash_key =
+                dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
+            coarse_collector.capture_data(hash_key, 1);
+        });
+    }
+    coarse_collector.analyse_data(result);
+    ASSERT_NE(result.coarse_bucket_index, -1);
+    _tracker.wait_outstanding_tasks();
+
+    coarse_collector.clear();
+    ASSERT_TRUE(empty());
+
+    for (int i = 0; i < 1000; i++) {
+
+        dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
+            dsn::blob hash_key = dsn::blob::create_from_bytes(generate_hash_key_by_random(false));
+            coarse_collector.capture_data(hash_key, 1);
+        });
+    }
+    coarse_collector.analyse_data(result);
+    _tracker.wait_outstanding_tasks();
+    ASSERT_EQ(result.coarse_bucket_index, -1);
+}
+
+class fine_collector_test : public pegasus_server_test_base
+{
+public:
+    int max_queue_size = 1000;
+    int target_bucket_index = 0;
+    fine_collector_test() : fine_collector(_server.get(), 0, max_queue_size){};
+
+    hotkey_fine_data_collector fine_collector;
+
+    int now_queue_size()
+    {
+        int queue_size = 0;
+        std::pair<dsn::blob, uint64_t> key_weight_pair;
+        while (fine_collector._capture_key_queue.try_dequeue(key_weight_pair)) {
+            queue_size++;
+        };
+        return queue_size;
+    }
+
+    dsn::task_tracker _tracker;
+};
+
+TEST_F(fine_collector_test, fine_collector)
+{
+    auto hotkey_buckets_num_backup = FLAGS_hotkey_buckets_num;
+    FLAGS_hotkey_buckets_num = 1;
+    detect_hotkey_result result;
+
+    for (int i = 0; i < 1000; i++) {
+        dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
+            dsn::blob hash_key =
+                dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
+            fine_collector.capture_data(hash_key, 1);
+        });
+    }
+    fine_collector.analyse_data(result);
+    _tracker.wait_outstanding_tasks();
+
+    ASSERT_EQ(result.hot_hash_key, "ThisisahotkeyThisisahotkey");
+
+    fine_collector.clear();
+    ASSERT_EQ(now_queue_size(), 0);
+
+    result.hot_hash_key = "";
+    for (int i = 0; i < 1000; i++) {
+        dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
+            dsn::blob hash_key = dsn::blob::create_from_bytes(generate_hash_key_by_random(false));
+            fine_collector.capture_data(hash_key, 1);
+        });
+    }
+    fine_collector.analyse_data(result);
+    _tracker.wait_outstanding_tasks();
+    ASSERT_TRUE(result.hot_hash_key.empty());
+
+    for (int i = 0; i < 5000; i++) {
+        dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
+            dsn::blob hash_key =
+                dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
+            fine_collector.capture_data(hash_key, 1);
+        });
+    }
+    _tracker.wait_outstanding_tasks();
+    ASSERT_LT(now_queue_size(), max_queue_size * 2);
+
+    FLAGS_hotkey_buckets_num = hotkey_buckets_num_backup;
+}
+
+} // namespace server
+} // namespace pegasus


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org