You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2020/10/16 02:26:21 UTC
[incubator-pegasus] branch master updated: feat(hotkey): capture
data part1 - declare internal collector (#618)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 1461dc5 feat(hotkey): capture data part1 - declare internal collector (#618)
1461dc5 is described below
commit 1461dc55035d03552e62a1d9ffa7ac27cf9e1423
Author: Smilencer <52...@qq.com>
AuthorDate: Fri Oct 16 10:26:10 2020 +0800
feat(hotkey): capture data part1 - declare internal collector (#618)
---
src/server/capacity_unit_calculator.cpp | 2 +-
src/server/hotkey_collector.cpp | 20 ++++++++++++++++++--
src/server/hotkey_collector.h | 20 ++++++++++++++++++++
src/server/pegasus_server_impl.cpp | 16 ++++++++++++++++
4 files changed, 55 insertions(+), 3 deletions(-)
diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index 6a055e6..07af04f 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -217,7 +217,7 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status,
}
_pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
uint64_t key_count = kvs.size();
- _write_hotkey_collector->capture_raw_key(hash_key, key_count);
+ _write_hotkey_collector->capture_hash_key(hash_key, key_count);
if (status != rocksdb::Status::kOk) {
return;
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index c05e33c..cb3d1c9 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -17,9 +17,17 @@
#include "hotkey_collector.h"
+#include <dsn/utility/smart_pointers.h>
+#include "base/pegasus_key_schema.h"
+
namespace pegasus {
namespace server {
+hotkey_collector::hotkey_collector()
+ : _internal_collector(dsn::make_unique<hotkey_empty_data_collector>())
+{
+}
+
// TODO: (Tangyanzhao) implement these functions
void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
@@ -28,10 +36,18 @@ void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request
void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight)
{
- // TODO: (Tangyanzhao) Add a judgment sentence to check if it is a raw key
+ dsn::blob hash_key, sort_key;
+ pegasus_restore_key(raw_key, hash_key, sort_key);
+ capture_hash_key(hash_key, weight);
+}
+
+void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight)
+{
+ // TODO: (Tangyanzhao) add a unit test to ensure data integrity
+ _internal_collector->capture_data(hash_key, weight);
}
-void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight) {}
+void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }
} // namespace server
} // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index a1818d3..5c092dc 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -24,6 +24,8 @@
namespace pegasus {
namespace server {
+class internal_collector_base;
+
// hotkey_collector is responsible to find the hot keys after the partition
// was detected to be hot. The two types of hotkey, READ & WRITE, are detected
// separately.
@@ -64,16 +66,34 @@ namespace server {
class hotkey_collector
{
public:
+ hotkey_collector();
// TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection
// weight: calculate the weight according to the specific situation
void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
void capture_hash_key(const dsn::blob &hash_key, int64_t weight);
+ void analyse_data();
void handle_rpc(const dsn::replication::detect_hotkey_request &req,
/*out*/ dsn::replication::detect_hotkey_response &resp);
private:
+ std::unique_ptr<internal_collector_base> _internal_collector;
std::atomic<hotkey_collector_state> _state;
};
+class internal_collector_base
+{
+public:
+ virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0;
+ virtual void analyse_data() = 0;
+};
+
+// used in hotkey_collector_state::STOPPED and hotkey_collector_state::FINISHED, avoid null pointers
+class hotkey_empty_data_collector : public internal_collector_base
+{
+public:
+ void capture_data(const dsn::blob &hash_key, uint64_t size) {}
+ void analyse_data() {}
+};
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index cdd5b35..e565d53 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -15,6 +15,7 @@
#include <dsn/utility/string_conv.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication.codes.h>
+#include <dsn/utility/flags.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
@@ -31,6 +32,11 @@ namespace server {
DEFINE_TASK_CODE(LPC_PEGASUS_SERVER_DELAY, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
+DSN_DEFINE_int32("pegasus.server",
+ hotkey_analyse_time_interval_s,
+ 10,
+ "hotkey analyse interval in seconds");
+
static std::string chkpt_get_dir_name(int64_t decree)
{
char buffer[256];
@@ -1490,6 +1496,16 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
this, _read_hotkey_collector, _write_hotkey_collector);
_server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
+ ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+ &_tracker,
+ [this]() { _read_hotkey_collector->analyse_data(); },
+ std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+
+ ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+ &_tracker,
+ [this]() { _write_hotkey_collector->analyse_data(); },
+ std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+
return ::dsn::ERR_OK;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org