You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2020/11/19 06:02:43 UTC
[incubator-pegasus] branch master updated: feat(hotkey): add unit
test of internal_collector (#639)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 737d2ef feat(hotkey): add unit test of internal_collector (#639)
737d2ef is described below
commit 737d2eff55cbf0280f6afb1e7533288e5fda41c5
Author: Smilencer <52...@qq.com>
AuthorDate: Thu Nov 19 14:00:36 2020 +0800
feat(hotkey): add unit test of internal_collector (#639)
---
rdsn | 2 +-
src/server/hotkey_collector.cpp | 5 +-
src/server/hotkey_collector.h | 9 ++
src/server/test/config.ini | 3 +
src/server/test/hotkey_collector_test.cpp | 200 ++++++++++++++++++++++++++++++
5 files changed, 215 insertions(+), 4 deletions(-)
diff --git a/rdsn b/rdsn
index ec47b0c..501fe55 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit ec47b0cfd8cbdc30b69b1d252e59588afb3f0b51
+Subproject commit 501fe552f960b6afa1e1dd4b5d1bf33c87379470
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index c326e72..892e6eb 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -19,7 +19,6 @@
#include <dsn/dist/replication/replication_enums.h>
#include <dsn/utility/smart_pointers.h>
-#include <dsn/utility/flags.h>
#include <boost/functional/hash.hpp>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
@@ -66,7 +65,7 @@ DSN_DEFINE_int32(
"the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
// 68–95–99.7 rule, same algorithm as hotspot_partition_calculator::stat_histories_analyse
-static bool
+/*extern*/ bool
find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, int &hot_index)
{
dcheck_gt(captured_keys.size(), 2);
@@ -102,7 +101,7 @@ find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, in
}
// TODO: (Tangyanzhao) replace it to xxhash
-static int get_bucket_id(dsn::string_view data)
+/*extern*/ int get_bucket_id(dsn::string_view data)
{
size_t hash_value = boost::hash_range(data.begin(), data.end());
return static_cast<int>(hash_value % FLAGS_hotkey_buckets_num);
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index 9d6c3f1..066c7df 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -33,6 +33,10 @@ struct detect_hotkey_result
std::string hot_hash_key;
};
+extern int get_bucket_id(dsn::string_view data);
+extern bool
+find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, int &hot_index);
+
// hotkey_collector is responsible to find the hot keys after the partition
// was detected to be hot. The two types of hotkey, READ & WRITE, are detected
// separately.
@@ -96,6 +100,7 @@ private:
uint64_t _collector_start_time_second;
};
+// Be sure every function in internal_collector_base should be thread safe
class internal_collector_base : public dsn::replication::replica_base
{
public:
@@ -128,6 +133,8 @@ private:
hotkey_coarse_data_collector() = delete;
std::vector<std::atomic<uint64_t>> _hash_buckets;
+
+ friend class coarse_collector_test;
};
class hotkey_fine_data_collector : public internal_collector_base
@@ -145,6 +152,8 @@ private:
const uint32_t _target_bucket_index;
// ConcurrentQueue is a lock-free queue to capture keys
moodycamel::ConcurrentQueue<std::pair<dsn::blob, uint64_t>> _capture_key_queue;
+
+ friend class fine_collector_test;
};
} // namespace server
diff --git a/src/server/test/config.ini b/src/server/test/config.ini
index cc034aa..ab557e7 100644
--- a/src/server/test/config.ini
+++ b/src/server/test/config.ini
@@ -205,6 +205,9 @@ falcon_host = 127.0.0.1
falcon_port = 1988
falcon_path = /v1/push
+hot_bucket_variance_threshold = 5
+hot_key_variance_threshold = 5
+
[components.pegasus_perf_counter_number_percentile_atomic]
counter_computation_interval_seconds = 10
diff --git a/src/server/test/hotkey_collector_test.cpp b/src/server/test/hotkey_collector_test.cpp
new file mode 100644
index 0000000..7c98ed9
--- /dev/null
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "server/hotkey_collector.h"
+
+#include <dsn/utility/rand.h>
+#include <dsn/utility/flags.h>
+#include <dsn/tool-api/task_tracker.h>
+#include "pegasus_server_test_base.h"
+
+namespace pegasus {
+namespace server {
+
+DSN_DECLARE_uint32(hotkey_buckets_num);
+
+static std::string generate_hash_key_by_random(bool is_hotkey, int probability = 100)
+{
+ if (is_hotkey && (dsn::rand::next_u32(100) < probability)) {
+ return "ThisisahotkeyThisisahotkey";
+ }
+ static const std::string chars("abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "1234567890"
+ "!@#$%^&*()"
+ "`~-_=+[{]{\\|;:'\",<.>/? ");
+ std::string result;
+ for (int i = 0; i < 20; i++) {
+ result += chars[dsn::rand::next_u32(chars.size())];
+ }
+ return result;
+}
+
+TEST(hotkey_collector_test, get_bucket_id_test)
+{
+ int bucket_id = -1;
+ for (int i = 0; i < 1000000; i++) {
+ bucket_id = get_bucket_id(dsn::blob::create_from_bytes(generate_hash_key_by_random(false)));
+ ASSERT_GE(bucket_id, 0);
+ ASSERT_LT(bucket_id, FLAGS_hotkey_buckets_num);
+ }
+}
+
+TEST(hotkey_collector_test, find_outlier_index_test)
+{
+ int threshold = 3;
+ int hot_index;
+ bool hot_index_found;
+
+ hot_index_found = find_outlier_index({1, 2, 3}, threshold, hot_index);
+ ASSERT_EQ(hot_index_found, false);
+ ASSERT_EQ(hot_index, -1);
+
+ hot_index_found = find_outlier_index({1, 2, 100000}, threshold, hot_index);
+ ASSERT_EQ(hot_index_found, true);
+ ASSERT_EQ(hot_index, 2);
+
+ hot_index_found = find_outlier_index({1, 10000, 2, 3, 4, 10000000, 6}, threshold, hot_index);
+ ASSERT_EQ(hot_index_found, true);
+ ASSERT_EQ(hot_index, 5);
+
+ hot_index_found = find_outlier_index(
+ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, threshold, hot_index);
+ ASSERT_EQ(hot_index_found, false);
+ ASSERT_EQ(hot_index, -1);
+}
+
+class coarse_collector_test : public pegasus_server_test_base
+{
+public:
+ coarse_collector_test() : coarse_collector(_server.get()){};
+
+ hotkey_coarse_data_collector coarse_collector;
+
+ bool empty()
+ {
+ int empty = true;
+ for (const auto &iter : coarse_collector._hash_buckets) {
+ if (iter.load() != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ dsn::task_tracker _tracker;
+};
+
+TEST_F(coarse_collector_test, coarse_collector)
+{
+ detect_hotkey_result result;
+
+ for (int i = 0; i < 1000; i++) {
+ dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
+ dsn::blob hash_key =
+ dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
+ coarse_collector.capture_data(hash_key, 1);
+ });
+ }
+ coarse_collector.analyse_data(result);
+ ASSERT_NE(result.coarse_bucket_index, -1);
+ _tracker.wait_outstanding_tasks();
+
+ coarse_collector.clear();
+ ASSERT_TRUE(empty());
+
+ for (int i = 0; i < 1000; i++) {
+
+ dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
+ dsn::blob hash_key = dsn::blob::create_from_bytes(generate_hash_key_by_random(false));
+ coarse_collector.capture_data(hash_key, 1);
+ });
+ }
+ coarse_collector.analyse_data(result);
+ _tracker.wait_outstanding_tasks();
+ ASSERT_EQ(result.coarse_bucket_index, -1);
+}
+
+class fine_collector_test : public pegasus_server_test_base
+{
+public:
+ int max_queue_size = 1000;
+ int target_bucket_index = 0;
+ fine_collector_test() : fine_collector(_server.get(), 0, max_queue_size){};
+
+ hotkey_fine_data_collector fine_collector;
+
+ int now_queue_size()
+ {
+ int queue_size = 0;
+ std::pair<dsn::blob, uint64_t> key_weight_pair;
+ while (fine_collector._capture_key_queue.try_dequeue(key_weight_pair)) {
+ queue_size++;
+ };
+ return queue_size;
+ }
+
+ dsn::task_tracker _tracker;
+};
+
+TEST_F(fine_collector_test, fine_collector)
+{
+ auto hotkey_buckets_num_backup = FLAGS_hotkey_buckets_num;
+ FLAGS_hotkey_buckets_num = 1;
+ detect_hotkey_result result;
+
+ for (int i = 0; i < 1000; i++) {
+ dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
+ dsn::blob hash_key =
+ dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
+ fine_collector.capture_data(hash_key, 1);
+ });
+ }
+ fine_collector.analyse_data(result);
+ _tracker.wait_outstanding_tasks();
+
+ ASSERT_EQ(result.hot_hash_key, "ThisisahotkeyThisisahotkey");
+
+ fine_collector.clear();
+ ASSERT_EQ(now_queue_size(), 0);
+
+ result.hot_hash_key = "";
+ for (int i = 0; i < 1000; i++) {
+ dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
+ dsn::blob hash_key = dsn::blob::create_from_bytes(generate_hash_key_by_random(false));
+ fine_collector.capture_data(hash_key, 1);
+ });
+ }
+ fine_collector.analyse_data(result);
+ _tracker.wait_outstanding_tasks();
+ ASSERT_TRUE(result.hot_hash_key.empty());
+
+ for (int i = 0; i < 5000; i++) {
+ dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
+ dsn::blob hash_key =
+ dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
+ fine_collector.capture_data(hash_key, 1);
+ });
+ }
+ _tracker.wait_outstanding_tasks();
+ ASSERT_LT(now_queue_size(), max_queue_size * 2);
+
+ FLAGS_hotkey_buckets_num = hotkey_buckets_num_backup;
+}
+
+} // namespace server
+} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org