You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/06/28 06:55:15 UTC
[incubator-pegasus] branch master updated: feat: add creator for
compaction operation (#769)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new ba5e751 feat: add creator for compaction operation (#769)
ba5e751 is described below
commit ba5e751de1535596a05c16e4d9f1b7fc780ca754
Author: zhao liwei <zl...@163.com>
AuthorDate: Mon Jun 28 14:55:06 2021 +0800
feat: add creator for compaction operation (#769)
---
src/server/compaction_filter_rule.cpp | 7 +-
src/server/compaction_filter_rule.h | 11 +--
src/server/compaction_operation.cpp | 98 +++++++++++++++++++++++++--
src/server/compaction_operation.h | 70 +++++++++++++++++--
src/server/main.cpp | 4 +-
src/server/test/compaction_operation_test.cpp | 65 ++++++++++++++++--
src/server/test/main.cpp | 4 +-
7 files changed, 229 insertions(+), 30 deletions(-)
diff --git a/src/server/compaction_filter_rule.cpp b/src/server/compaction_filter_rule.cpp
index 6e37f4b..e414ba8 100644
--- a/src/server/compaction_filter_rule.cpp
+++ b/src/server/compaction_filter_rule.cpp
@@ -69,17 +69,14 @@ bool sortkey_pattern_rule::match(const std::string &hash_key,
return string_pattern_match(sort_key, match_type, pattern);
}
-ttl_range_rule::ttl_range_rule(uint32_t pegasus_data_version)
- : pegasus_data_version(pegasus_data_version)
-{
-}
+ttl_range_rule::ttl_range_rule(uint32_t data_version) : data_version(data_version) {}
bool ttl_range_rule::match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const
{
uint32_t expire_ts =
- pegasus_extract_expire_ts(pegasus_data_version, utils::to_string_view(existing_value));
+ pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
// if start_ttl and stop_ttl = 0, it means we want to delete keys which have no ttl
if (0 == expire_ts && 0 == start_ttl && 0 == stop_ttl) {
return true;
diff --git a/src/server/compaction_filter_rule.h b/src/server/compaction_filter_rule.h
index c6d678c..f55cdaa 100644
--- a/src/server/compaction_filter_rule.h
+++ b/src/server/compaction_filter_rule.h
@@ -48,9 +48,9 @@ class compaction_filter_rule
{
public:
template <typename T>
- static compaction_filter_rule *create(const std::string ¶ms, uint32_t pegasus_data_version)
+ static compaction_filter_rule *create(const std::string ¶ms, uint32_t data_version)
{
- T *rule = new T(pegasus_data_version);
+ T *rule = new T(data_version);
if (!dsn::json::json_forwarder<T>::decode(
dsn::blob::create_from_bytes(params.data(), params.size()), *rule)) {
delete rule;
@@ -108,6 +108,7 @@ private:
FRIEND_TEST(update_ttl_test, filter);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
FRIEND_TEST(compaction_filter_rule_test, create);
+ FRIEND_TEST(compaction_filter_operation_test, create_operations);
};
class sortkey_pattern_rule : public compaction_filter_rule
@@ -127,12 +128,13 @@ private:
FRIEND_TEST(sortkey_pattern_rule_test, match);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
FRIEND_TEST(compaction_filter_rule_test, create);
+ FRIEND_TEST(compaction_filter_operation_test, create_operations);
};
class ttl_range_rule : public compaction_filter_rule
{
public:
- explicit ttl_range_rule(uint32_t pegasus_data_version);
+ explicit ttl_range_rule(uint32_t data_version);
bool match(const std::string &hash_key,
const std::string &sort_key,
@@ -143,11 +145,12 @@ private:
// = 0 means no limit
uint32_t start_ttl;
uint32_t stop_ttl;
- uint32_t pegasus_data_version;
+ uint32_t data_version;
FRIEND_TEST(ttl_range_rule_test, match);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
FRIEND_TEST(compaction_filter_rule_test, create);
+ FRIEND_TEST(compaction_filter_operation_test, create_operations);
};
void register_compaction_filter_rules();
diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp
index 895c75d..86b12c0 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -41,11 +41,15 @@ bool compaction_operation::all_rules_match(const std::string &hash_key,
return true;
}
-delete_key::delete_key(filter_rules &&rules, uint32_t pegasus_data_version)
- : compaction_operation(std::move(rules), pegasus_data_version)
+void compaction_operation::set_rules(filter_rules &&tmp_rules) { rules.swap(tmp_rules); }
+
+delete_key::delete_key(filter_rules &&rules, uint32_t data_version)
+ : compaction_operation(std::move(rules), data_version)
{
}
+delete_key::delete_key(uint32_t data_version) : compaction_operation(data_version) {}
+
bool delete_key::filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
@@ -58,11 +62,13 @@ bool delete_key::filter(const std::string &hash_key,
return true;
}
-update_ttl::update_ttl(filter_rules &&rules, uint32_t pegasus_data_version)
- : compaction_operation(std::move(rules), pegasus_data_version)
+update_ttl::update_ttl(filter_rules &&rules, uint32_t data_version)
+ : compaction_operation(std::move(rules), data_version)
{
}
+update_ttl::update_ttl(uint32_t data_version) : compaction_operation(data_version) {}
+
bool update_ttl::filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
@@ -79,8 +85,7 @@ bool update_ttl::filter(const std::string &hash_key,
new_ts = utils::epoch_now() + value;
break;
case update_ttl_op_type::UTOT_FROM_CURRENT: {
- auto ttl =
- pegasus_extract_expire_ts(pegasus_data_version, utils::to_string_view(existing_value));
+ auto ttl = pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
if (ttl == 0) {
return false;
}
@@ -97,10 +102,89 @@ bool update_ttl::filter(const std::string &hash_key,
}
*new_value = existing_value.ToString();
- pegasus_update_expire_ts(pegasus_data_version, *new_value, new_ts);
+ pegasus_update_expire_ts(data_version, *new_value, new_ts);
*value_changed = true;
return false;
}
+namespace internal {
+struct json_helper
+{
+ struct user_specified_compaction_rule
+ {
+ filter_rule_type type;
+ std::string params;
+ DEFINE_JSON_SERIALIZATION(type, params)
+ };
+
+ struct user_specified_compaction_op
+ {
+ compaction_operation_type type;
+ std::string params;
+ std::vector<user_specified_compaction_rule> rules;
+ DEFINE_JSON_SERIALIZATION(type, params, rules)
+ };
+
+ // key: filter_operation_type
+ std::vector<user_specified_compaction_op> ops;
+ DEFINE_JSON_SERIALIZATION(ops)
+};
+} // namespace internal
+
+std::unique_ptr<compaction_filter_rule> create_compaction_filter_rule(filter_rule_type type,
+ const std::string ¶ms,
+ uint32_t data_version)
+{
+ auto rule = dsn::utils::factory_store<compaction_filter_rule>::create(
+ enum_to_string(type), dsn::PROVIDER_TYPE_MAIN, params, data_version);
+ return std::unique_ptr<compaction_filter_rule>(rule);
+}
+
+filter_rules create_compaction_filter_rules(
+ const std::vector<internal::json_helper::user_specified_compaction_rule> &rules,
+ uint32_t data_version)
+{
+ filter_rules res;
+ for (const auto &rule : rules) {
+ auto operation_rule = create_compaction_filter_rule(rule.type, rule.params, data_version);
+ if (operation_rule != nullptr) {
+ res.emplace_back(std::move(operation_rule));
+ }
+ }
+ return res;
+}
+
+compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version)
+{
+ compaction_operations res;
+ internal::json_helper compaction;
+ if (!dsn::json::json_forwarder<internal::json_helper>::decode(
+ dsn::blob::create_from_bytes(json.data(), json.size()), compaction)) {
+ ddebug("invalid user specified compaction format");
+ return res;
+ }
+
+ for (const auto &op : compaction.ops) {
+ filter_rules rules = create_compaction_filter_rules(op.rules, data_version);
+ if (rules.size() == 0) {
+ continue;
+ }
+
+ compaction_operation *operation = dsn::utils::factory_store<compaction_operation>::create(
+ enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
+ if (operation != nullptr) {
+ operation->set_rules(std::move(rules));
+ res.emplace_back(std::unique_ptr<compaction_operation>(operation));
+ }
+ }
+ return res;
+}
+
+void register_compaction_operations()
+{
+ delete_key::register_component<delete_key>(enum_to_string(COT_DELETE));
+ update_ttl::register_component<update_ttl>(enum_to_string(COT_UPDATE_TTL));
+ register_compaction_filter_rules();
+}
} // namespace server
} // namespace pegasus
diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h
index d4e4285..f1cdf93 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -25,6 +25,18 @@
namespace pegasus {
namespace server {
+enum compaction_operation_type
+{
+ COT_UPDATE_TTL,
+ COT_DELETE,
+ COT_INVALID,
+};
+ENUM_BEGIN(compaction_operation_type, COT_INVALID)
+ENUM_REG(COT_UPDATE_TTL)
+ENUM_REG(COT_DELETE)
+ENUM_END(compaction_operation_type)
+
+ENUM_TYPE_SERIALIZATION(compaction_operation_type, COT_INVALID)
typedef std::vector<std::unique_ptr<compaction_filter_rule>> filter_rules;
/** compaction_operation represents the compaction operation. A compaction operation will be
@@ -32,15 +44,30 @@ typedef std::vector<std::unique_ptr<compaction_filter_rule>> filter_rules;
class compaction_operation
{
public:
- compaction_operation(filter_rules &&rules, uint32_t pegasus_data_version)
- : rules(std::move(rules)), pegasus_data_version(pegasus_data_version)
+ template <typename T>
+ static compaction_operation *create(const std::string ¶ms, uint32_t data_version)
+ {
+ return T::creator(params, data_version);
+ }
+
+ template <typename T>
+ static void register_component(const char *name)
+ {
+ dsn::utils::factory_store<compaction_operation>::register_factory(
+ name, create<T>, dsn::PROVIDER_TYPE_MAIN);
+ }
+
+ compaction_operation(filter_rules &&rules, uint32_t data_version)
+ : rules(std::move(rules)), data_version(data_version)
{
}
+ explicit compaction_operation(uint32_t data_version) : data_version(data_version) {}
virtual ~compaction_operation() = 0;
bool all_rules_match(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value) const;
+ void set_rules(filter_rules &&rules);
/**
* @return false indicates that this key-value should be removed
* If you want to modify the existing_value, you can pass it back through new_value and
@@ -54,13 +81,19 @@ public:
protected:
filter_rules rules;
- uint32_t pegasus_data_version;
+ uint32_t data_version;
};
class delete_key : public compaction_operation
{
public:
- delete_key(filter_rules &&rules, uint32_t pegasus_data_version);
+ static compaction_operation *creator(const std::string ¶ms, uint32_t data_version)
+ {
+ return new delete_key(data_version);
+ }
+
+ delete_key(filter_rules &&rules, uint32_t data_version);
+ explicit delete_key(uint32_t data_version);
bool filter(const std::string &hash_key,
const std::string &sort_key,
@@ -71,6 +104,7 @@ public:
private:
FRIEND_TEST(delete_key_test, filter);
FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
+ FRIEND_TEST(compaction_filter_operation_test, create_operations);
};
enum update_ttl_op_type
@@ -84,23 +118,49 @@ enum update_ttl_op_type
UTOT_TIMESTAMP,
UTOT_INVALID,
};
+ENUM_BEGIN(update_ttl_op_type, UTOT_INVALID)
+ENUM_REG(UTOT_FROM_NOW)
+ENUM_REG(UTOT_FROM_CURRENT)
+ENUM_REG(UTOT_TIMESTAMP)
+ENUM_END(update_ttl_op_type)
+
+ENUM_TYPE_SERIALIZATION(update_ttl_op_type, UTOT_INVALID)
class update_ttl : public compaction_operation
{
public:
- update_ttl(filter_rules &&rules, uint32_t pegasus_data_version);
+ static compaction_operation *creator(const std::string ¶ms, uint32_t data_version)
+ {
+ update_ttl *operation = new update_ttl(data_version);
+ if (!dsn::json::json_forwarder<update_ttl>::decode(
+ dsn::blob::create_from_bytes(params.data(), params.size()), *operation)) {
+ delete operation;
+ return nullptr;
+ }
+ return operation;
+ }
+
+ update_ttl(filter_rules &&rules, uint32_t data_version);
+ explicit update_ttl(uint32_t data_version);
bool filter(const std::string &hash_key,
const std::string &sort_key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const;
+ DEFINE_JSON_SERIALIZATION(type, value)
private:
update_ttl_op_type type;
uint32_t value;
FRIEND_TEST(update_ttl_test, filter);
+ FRIEND_TEST(compaction_filter_operation_test, creator);
+ FRIEND_TEST(compaction_filter_operation_test, create_operations);
};
+
+typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
+compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
+void register_compaction_operations();
} // namespace server
} // namespace pegasus
diff --git a/src/server/main.cpp b/src/server/main.cpp
index d12c0e6..eb3c0a5 100644
--- a/src/server/main.cpp
+++ b/src/server/main.cpp
@@ -21,7 +21,7 @@
#include "pegasus_service_app.h"
#include "info_collector_app.h"
#include "brief_stat.h"
-#include "compaction_filter_rule.h"
+#include "compaction_operation.h"
#include <pegasus/version.h>
#include <pegasus/git_commit.h>
@@ -88,7 +88,7 @@ void dsn_app_registration_pegasus()
"server-stat - query selected perf counters",
"server-stat",
[](const std::vector<std::string> &args) { return pegasus::get_brief_stat(); });
- pegasus::server::register_compaction_filter_rules();
+ pegasus::server::register_compaction_operations();
}
int main(int argc, char **argv)
diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp
index 9303872..8acad74 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -92,13 +92,13 @@ TEST(compaction_filter_operation_test, all_rules_match)
rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
rules.push_back(dsn::make_unique<sortkey_pattern_rule>());
rules.push_back(dsn::make_unique<ttl_range_rule>(data_version));
- delete_key update_operation(std::move(rules), data_version);
+ delete_key delete_operation(std::move(rules), data_version);
pegasus_value_generator gen;
auto now_ts = utils::epoch_now();
for (const auto &test : tests) {
- auto hash_rule = static_cast<hashkey_pattern_rule *>(update_operation.rules[0].get());
- auto sort_rule = static_cast<sortkey_pattern_rule *>(update_operation.rules[1].get());
- auto ttl_rule = static_cast<ttl_range_rule *>(update_operation.rules[2].get());
+ auto hash_rule = static_cast<hashkey_pattern_rule *>(delete_operation.rules[0].get());
+ auto sort_rule = static_cast<sortkey_pattern_rule *>(delete_operation.rules[1].get());
+ auto ttl_rule = static_cast<ttl_range_rule *>(delete_operation.rules[2].get());
hash_rule->pattern = test.hashkey_pattern;
hash_rule->match_type = test.hashkey_match_type;
@@ -109,7 +109,7 @@ TEST(compaction_filter_operation_test, all_rules_match)
rocksdb::SliceParts svalue =
gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
- ASSERT_EQ(update_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
+ ASSERT_EQ(delete_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
test.all_match);
}
@@ -222,5 +222,60 @@ TEST(update_ttl_test, filter)
}
}
}
+
+TEST(compaction_filter_operation_test, creator)
+{
+ uint32_t data_version = 1;
+ std::string params_json = R"({"type":"UTOT_FROM_CURRENT","value":2000})";
+ update_ttl *update_ttl_op =
+ static_cast<update_ttl *>(update_ttl::creator(params_json, data_version));
+ ASSERT_EQ(update_ttl_op->value, 2000);
+ ASSERT_EQ(update_ttl_op->type, UTOT_FROM_CURRENT);
+ delete update_ttl_op;
+
+ // invalid operation
+ params_json = R"({"type_xxx":"UTOT_FROM_CURRENT","value":2000})";
+ compaction_operation *invalid_op = update_ttl::creator(params_json, data_version);
+ ASSERT_EQ(invalid_op, nullptr);
+ params_json = R"({"type":"UTOT_FROM_CURRENT","value_xxx":2000})";
+ invalid_op = update_ttl::creator(params_json, data_version);
+ ASSERT_EQ(invalid_op, nullptr);
+}
+
+TEST(compaction_filter_operation_test, create_operations)
+{
+ std::string json =
+ "{\"ops\":[{\"type\":\"COT_DELETE\",\"params\":\"\",\"rules\":[{\"type\":\"FRT_HASHKEY_"
+ "PATTERN\",\"params\":\"{\\\"pattern\\\":\\\"hashkey\\\",\\\"match_type\\\":\\\"SMT_MATCH_"
+ "PREFIX\\\"}\"}]},{\"type\":\"COT_UPDATE_TTL\",\"params\":\"{\\\"type\\\":\\\"UTOT_FROM_"
+ "NOW\\\",\\\"value\\\":10000}\",\"rules\":[{\"type\":\"FRT_HASHKEY_PATTERN\","
+ "\"params\":\"{\\\"pattern\\\":\\\"hashkey\\\",\\\"match_type\\\":\\\"SMT_MATCH_"
+ "ANYWHERE\\\"}\"},{\"type\":\"FRT_SORTKEY_PATTERN\",\"params\":\"{\\\"pattern\\\":"
+ "\\\"sortkey\\\",\\\"match_type\\\":\\\"SMT_MATCH_POSTFIX\\\"}\"},{\"type\":\"FRT_"
+ "TTL_RANGE\",\"params\":\"{\\\"start_ttl\\\":0,\\\"stop_ttl\\\":2000}\"}]}]"
+ "}";
+ auto operations = create_compaction_operations(json, 1);
+ ASSERT_EQ(operations.size(), 2);
+
+ auto first_operation = static_cast<delete_key *>(operations.begin()->get());
+ ASSERT_EQ(first_operation->rules.size(), 1);
+ auto hash_rule = static_cast<hashkey_pattern_rule *>(first_operation->rules[0].get());
+ ASSERT_EQ(hash_rule->pattern, "hashkey");
+ ASSERT_EQ(hash_rule->match_type, SMT_MATCH_PREFIX);
+
+ auto second_operation = static_cast<update_ttl *>(operations.rbegin()->get());
+ ASSERT_EQ(second_operation->type, UTOT_FROM_NOW);
+ ASSERT_EQ(second_operation->value, 10000);
+ ASSERT_EQ(second_operation->rules.size(), 3);
+ hash_rule = static_cast<hashkey_pattern_rule *>(second_operation->rules[0].get());
+ ASSERT_EQ(hash_rule->pattern, "hashkey");
+ ASSERT_EQ(hash_rule->match_type, SMT_MATCH_ANYWHERE);
+ auto sort_rule = static_cast<sortkey_pattern_rule *>(second_operation->rules[1].get());
+ ASSERT_EQ(sort_rule->pattern, "sortkey");
+ ASSERT_EQ(sort_rule->match_type, SMT_MATCH_POSTFIX);
+ auto expire_ts_rule = static_cast<ttl_range_rule *>(second_operation->rules[2].get());
+ ASSERT_EQ(expire_ts_rule->start_ttl, 0);
+ ASSERT_EQ(expire_ts_rule->stop_ttl, 2000);
+}
} // namespace server
} // namespace pegasus
diff --git a/src/server/test/main.cpp b/src/server/test/main.cpp
index 985a3b5..56083e1 100644
--- a/src/server/test/main.cpp
+++ b/src/server/test/main.cpp
@@ -20,7 +20,7 @@
#include <gtest/gtest.h>
#include <dsn/service_api_cpp.h>
#include <dsn/dist/replication/replication_service_app.h>
-#include "server/compaction_filter_rule.h"
+#include "server/compaction_operation.h"
#include "server/pegasus_server_impl.h"
std::atomic_bool gtest_done{false};
@@ -49,7 +49,7 @@ GTEST_API_ int main(int argc, char **argv)
dsn::replication::replication_app_base::register_storage_engine(
"pegasus",
dsn::replication::replication_app_base::create<pegasus::server::pegasus_server_impl>);
- pegasus::server::register_compaction_filter_rules();
+ pegasus::server::register_compaction_operations();
dsn_run_config("config.ini", false);
while (!gtest_done) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org