You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:49:43 UTC

[GitHub] [incubator-pegasus] acelyc111 commented on a change in pull request #631: feat(hotkey): capture data part3 - declare fine collector

acelyc111 commented on a change in pull request #631:
URL: https://github.com/apache/incubator-pegasus/pull/631#discussion_r516053335



##########
File path: src/server/hotkey_collector.h
##########
@@ -123,5 +126,31 @@ class hotkey_coarse_data_collector : public internal_collector_base
     std::vector<std::atomic<uint64_t>> _hash_buckets;
 };
 
+typedef std::vector<moodycamel::ReaderWriterQueue<std::pair<dsn::blob, uint64_t>>>
+    string_capture_queue_vec;

Review comment:
       How about rename to `hash_key_request_times`, or some like that?

##########
File path: src/server/hotkey_collector.h
##########
@@ -123,5 +126,31 @@ class hotkey_coarse_data_collector : public internal_collector_base
     std::vector<std::atomic<uint64_t>> _hash_buckets;
 };
 
+typedef std::vector<moodycamel::ReaderWriterQueue<std::pair<dsn::blob, uint64_t>>>
+    string_capture_queue_vec;
+
+class hotkey_fine_data_collector : public internal_collector_base
+{
+public:
+    hotkey_fine_data_collector() = delete;
+    explicit hotkey_fine_data_collector(replica_base *base,
+                                        dsn::replication::hotkey_type::type hotkey_type,
+                                        int target_bucket_index,
+                                        int max_queue_size);
+    void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
+    void analyse_data(detect_hotkey_result &result) override;
+
+private:
+    inline int get_queue_index();
+
+    const dsn::replication::hotkey_type::type _hotkey_type;
+    int _max_queue_size;
+    const int _target_bucket_index;
+    // thread's native id -> data queue id.
+    std::unordered_map<int, int> _thread_queue_map;

Review comment:
       How about rename to `thread_id_to_queue_index` ?

##########
File path: src/server/hotkey_collector.h
##########
@@ -123,5 +126,31 @@ class hotkey_coarse_data_collector : public internal_collector_base
     std::vector<std::atomic<uint64_t>> _hash_buckets;
 };
 
+typedef std::vector<moodycamel::ReaderWriterQueue<std::pair<dsn::blob, uint64_t>>>
+    string_capture_queue_vec;
+
+class hotkey_fine_data_collector : public internal_collector_base
+{
+public:
+    hotkey_fine_data_collector() = delete;
+    explicit hotkey_fine_data_collector(replica_base *base,
+                                        dsn::replication::hotkey_type::type hotkey_type,
+                                        int target_bucket_index,
+                                        int max_queue_size);
+    void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
+    void analyse_data(detect_hotkey_result &result) override;
+
+private:
+    inline int get_queue_index();
+
+    const dsn::replication::hotkey_type::type _hotkey_type;
+    int _max_queue_size;
+    const int _target_bucket_index;
+    // thread's native id -> data queue id.
+    std::unordered_map<int, int> _thread_queue_map;
+    // Each element in the vector corresponds to a thread, each element is a lock-free queue
+    string_capture_queue_vec _string_capture_queue_vec;

Review comment:
       Better to give a more meanful name.

##########
File path: src/server/hotkey_collector.h
##########
@@ -123,5 +126,31 @@ class hotkey_coarse_data_collector : public internal_collector_base
     std::vector<std::atomic<uint64_t>> _hash_buckets;
 };
 
+typedef std::vector<moodycamel::ReaderWriterQueue<std::pair<dsn::blob, uint64_t>>>
+    string_capture_queue_vec;
+
+class hotkey_fine_data_collector : public internal_collector_base
+{
+public:
+    hotkey_fine_data_collector() = delete;
+    explicit hotkey_fine_data_collector(replica_base *base,
+                                        dsn::replication::hotkey_type::type hotkey_type,
+                                        int target_bucket_index,
+                                        int max_queue_size);
+    void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
+    void analyse_data(detect_hotkey_result &result) override;
+
+private:
+    inline int get_queue_index();
+
+    const dsn::replication::hotkey_type::type _hotkey_type;
+    int _max_queue_size;
+    const int _target_bucket_index;
+    // thread's native id -> data queue id.
+    std::unordered_map<int, int> _thread_queue_map;
+    // Each element in the vector corresponds to a thread, each element is a lock-free queue
+    string_capture_queue_vec _string_capture_queue_vec;

Review comment:
       Better to give a more meanful name.

##########
File path: src/server/hotkey_collector.cpp
##########
@@ -251,5 +261,111 @@ void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
     }
 }
 
+hotkey_fine_data_collector::hotkey_fine_data_collector(
+    replica_base *base,
+    dsn::replication::hotkey_type::type hotkey_type,
+    int target_bucket_index,
+    int max_queue_size)
+    : internal_collector_base(base),
+      _hotkey_type(hotkey_type),
+      _max_queue_size(max_queue_size),
+      _target_bucket_index(target_bucket_index)
+{
+    // Distinguish between single-threaded and multi-threaded environments
+    if (_hotkey_type == dsn::replication::hotkey_type::READ) {
+
+        auto threads = dsn::get_threadpool_threads_info(THREAD_POOL_LOCAL_APP);
+        int queue_num = threads.size();
+
+        _string_capture_queue_vec.reserve(queue_num);
+        for (int i = 0; i < queue_num; i++) {
+            _thread_queue_map.insert(std::make_pair(threads[i]->native_tid(), i));
+
+            // Create a vector of the ReaderWriterQueue whose size = _max_queue_size
+            _string_capture_queue_vec.emplace_back(_max_queue_size);
+        }
+
+    } else { // WRITE
+        _string_capture_queue_vec.emplace_back(_max_queue_size);
+    }
+}
+
+void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight)
+{
+    if (get_bucket_id(hash_key) != _target_bucket_index) {
+        return;
+    }
+    _string_capture_queue_vec[get_queue_index()].try_emplace(std::make_pair(hash_key, weight));
+}
+
+struct blob_hash
+{
+    std::size_t operator()(const dsn::blob &str) const
+    {
+        dsn::string_view cp(str);
+        return boost::hash_range(cp.begin(), cp.end());
+    }
+};
+struct blob_equal
+{
+    std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const
+    {
+        return dsn::string_view(lhs) == dsn::string_view(rhs);
+    }
+};

Review comment:
       Better to move to class `dsn::blob`'s definition, then you can reuse it easily.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org