You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/06/28 06:55:15 UTC

[incubator-pegasus] branch master updated: feat: add creator for compaction operation (#769)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new ba5e751  feat: add creator for compaction operation (#769)
ba5e751 is described below

commit ba5e751de1535596a05c16e4d9f1b7fc780ca754
Author: zhao liwei <zl...@163.com>
AuthorDate: Mon Jun 28 14:55:06 2021 +0800

    feat: add creator for compaction operation (#769)
---
 src/server/compaction_filter_rule.cpp         |  7 +-
 src/server/compaction_filter_rule.h           | 11 +--
 src/server/compaction_operation.cpp           | 98 +++++++++++++++++++++++++--
 src/server/compaction_operation.h             | 70 +++++++++++++++++--
 src/server/main.cpp                           |  4 +-
 src/server/test/compaction_operation_test.cpp | 65 ++++++++++++++++--
 src/server/test/main.cpp                      |  4 +-
 7 files changed, 229 insertions(+), 30 deletions(-)

diff --git a/src/server/compaction_filter_rule.cpp b/src/server/compaction_filter_rule.cpp
index 6e37f4b..e414ba8 100644
--- a/src/server/compaction_filter_rule.cpp
+++ b/src/server/compaction_filter_rule.cpp
@@ -69,17 +69,14 @@ bool sortkey_pattern_rule::match(const std::string &hash_key,
     return string_pattern_match(sort_key, match_type, pattern);
 }
 
-ttl_range_rule::ttl_range_rule(uint32_t pegasus_data_version)
-    : pegasus_data_version(pegasus_data_version)
-{
-}
+ttl_range_rule::ttl_range_rule(uint32_t data_version) : data_version(data_version) {}
 
 bool ttl_range_rule::match(const std::string &hash_key,
                            const std::string &sort_key,
                            const rocksdb::Slice &existing_value) const
 {
     uint32_t expire_ts =
-        pegasus_extract_expire_ts(pegasus_data_version, utils::to_string_view(existing_value));
+        pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
     // if start_ttl and stop_ttl = 0, it means we want to delete keys which have no ttl
     if (0 == expire_ts && 0 == start_ttl && 0 == stop_ttl) {
         return true;
diff --git a/src/server/compaction_filter_rule.h b/src/server/compaction_filter_rule.h
index c6d678c..f55cdaa 100644
--- a/src/server/compaction_filter_rule.h
+++ b/src/server/compaction_filter_rule.h
@@ -48,9 +48,9 @@ class compaction_filter_rule
 {
 public:
     template <typename T>
-    static compaction_filter_rule *create(const std::string &params, uint32_t pegasus_data_version)
+    static compaction_filter_rule *create(const std::string &params, uint32_t data_version)
     {
-        T *rule = new T(pegasus_data_version);
+        T *rule = new T(data_version);
         if (!dsn::json::json_forwarder<T>::decode(
                 dsn::blob::create_from_bytes(params.data(), params.size()), *rule)) {
             delete rule;
@@ -108,6 +108,7 @@ private:
     FRIEND_TEST(update_ttl_test, filter);
     FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
     FRIEND_TEST(compaction_filter_rule_test, create);
+    FRIEND_TEST(compaction_filter_operation_test, create_operations);
 };
 
 class sortkey_pattern_rule : public compaction_filter_rule
@@ -127,12 +128,13 @@ private:
     FRIEND_TEST(sortkey_pattern_rule_test, match);
     FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
     FRIEND_TEST(compaction_filter_rule_test, create);
+    FRIEND_TEST(compaction_filter_operation_test, create_operations);
 };
 
 class ttl_range_rule : public compaction_filter_rule
 {
 public:
-    explicit ttl_range_rule(uint32_t pegasus_data_version);
+    explicit ttl_range_rule(uint32_t data_version);
 
     bool match(const std::string &hash_key,
                const std::string &sort_key,
@@ -143,11 +145,12 @@ private:
     // = 0 means no limit
     uint32_t start_ttl;
     uint32_t stop_ttl;
-    uint32_t pegasus_data_version;
+    uint32_t data_version;
 
     FRIEND_TEST(ttl_range_rule_test, match);
     FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
     FRIEND_TEST(compaction_filter_rule_test, create);
+    FRIEND_TEST(compaction_filter_operation_test, create_operations);
 };
 
 void register_compaction_filter_rules();
diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp
index 895c75d..86b12c0 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -41,11 +41,15 @@ bool compaction_operation::all_rules_match(const std::string &hash_key,
     return true;
 }
 
-delete_key::delete_key(filter_rules &&rules, uint32_t pegasus_data_version)
-    : compaction_operation(std::move(rules), pegasus_data_version)
+void compaction_operation::set_rules(filter_rules &&tmp_rules) { rules.swap(tmp_rules); }
+
+delete_key::delete_key(filter_rules &&rules, uint32_t data_version)
+    : compaction_operation(std::move(rules), data_version)
 {
 }
 
+delete_key::delete_key(uint32_t data_version) : compaction_operation(data_version) {}
+
 bool delete_key::filter(const std::string &hash_key,
                         const std::string &sort_key,
                         const rocksdb::Slice &existing_value,
@@ -58,11 +62,13 @@ bool delete_key::filter(const std::string &hash_key,
     return true;
 }
 
-update_ttl::update_ttl(filter_rules &&rules, uint32_t pegasus_data_version)
-    : compaction_operation(std::move(rules), pegasus_data_version)
+update_ttl::update_ttl(filter_rules &&rules, uint32_t data_version)
+    : compaction_operation(std::move(rules), data_version)
 {
 }
 
+update_ttl::update_ttl(uint32_t data_version) : compaction_operation(data_version) {}
+
 bool update_ttl::filter(const std::string &hash_key,
                         const std::string &sort_key,
                         const rocksdb::Slice &existing_value,
@@ -79,8 +85,7 @@ bool update_ttl::filter(const std::string &hash_key,
         new_ts = utils::epoch_now() + value;
         break;
     case update_ttl_op_type::UTOT_FROM_CURRENT: {
-        auto ttl =
-            pegasus_extract_expire_ts(pegasus_data_version, utils::to_string_view(existing_value));
+        auto ttl = pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
         if (ttl == 0) {
             return false;
         }
@@ -97,10 +102,89 @@ bool update_ttl::filter(const std::string &hash_key,
     }
 
     *new_value = existing_value.ToString();
-    pegasus_update_expire_ts(pegasus_data_version, *new_value, new_ts);
+    pegasus_update_expire_ts(data_version, *new_value, new_ts);
     *value_changed = true;
     return false;
 }
 
+namespace internal {
+struct json_helper
+{
+    struct user_specified_compaction_rule
+    {
+        filter_rule_type type;
+        std::string params;
+        DEFINE_JSON_SERIALIZATION(type, params)
+    };
+
+    struct user_specified_compaction_op
+    {
+        compaction_operation_type type;
+        std::string params;
+        std::vector<user_specified_compaction_rule> rules;
+        DEFINE_JSON_SERIALIZATION(type, params, rules)
+    };
+
+    // key: filter_operation_type
+    std::vector<user_specified_compaction_op> ops;
+    DEFINE_JSON_SERIALIZATION(ops)
+};
+} // namespace internal
+
+std::unique_ptr<compaction_filter_rule> create_compaction_filter_rule(filter_rule_type type,
+                                                                      const std::string &params,
+                                                                      uint32_t data_version)
+{
+    auto rule = dsn::utils::factory_store<compaction_filter_rule>::create(
+        enum_to_string(type), dsn::PROVIDER_TYPE_MAIN, params, data_version);
+    return std::unique_ptr<compaction_filter_rule>(rule);
+}
+
+filter_rules create_compaction_filter_rules(
+    const std::vector<internal::json_helper::user_specified_compaction_rule> &rules,
+    uint32_t data_version)
+{
+    filter_rules res;
+    for (const auto &rule : rules) {
+        auto operation_rule = create_compaction_filter_rule(rule.type, rule.params, data_version);
+        if (operation_rule != nullptr) {
+            res.emplace_back(std::move(operation_rule));
+        }
+    }
+    return res;
+}
+
+compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version)
+{
+    compaction_operations res;
+    internal::json_helper compaction;
+    if (!dsn::json::json_forwarder<internal::json_helper>::decode(
+            dsn::blob::create_from_bytes(json.data(), json.size()), compaction)) {
+        ddebug("invalid user specified compaction format");
+        return res;
+    }
+
+    for (const auto &op : compaction.ops) {
+        filter_rules rules = create_compaction_filter_rules(op.rules, data_version);
+        if (rules.size() == 0) {
+            continue;
+        }
+
+        compaction_operation *operation = dsn::utils::factory_store<compaction_operation>::create(
+            enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
+        if (operation != nullptr) {
+            operation->set_rules(std::move(rules));
+            res.emplace_back(std::unique_ptr<compaction_operation>(operation));
+        }
+    }
+    return res;
+}
+
+void register_compaction_operations()
+{
+    delete_key::register_component<delete_key>(enum_to_string(COT_DELETE));
+    update_ttl::register_component<update_ttl>(enum_to_string(COT_UPDATE_TTL));
+    register_compaction_filter_rules();
+}
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h
index d4e4285..f1cdf93 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -25,6 +25,18 @@
 
 namespace pegasus {
 namespace server {
+enum compaction_operation_type
+{
+    COT_UPDATE_TTL,
+    COT_DELETE,
+    COT_INVALID,
+};
+ENUM_BEGIN(compaction_operation_type, COT_INVALID)
+ENUM_REG(COT_UPDATE_TTL)
+ENUM_REG(COT_DELETE)
+ENUM_END(compaction_operation_type)
+
+ENUM_TYPE_SERIALIZATION(compaction_operation_type, COT_INVALID)
 
 typedef std::vector<std::unique_ptr<compaction_filter_rule>> filter_rules;
 /** compaction_operation represents the compaction operation. A compaction operation will be
@@ -32,15 +44,30 @@ typedef std::vector<std::unique_ptr<compaction_filter_rule>> filter_rules;
 class compaction_operation
 {
 public:
-    compaction_operation(filter_rules &&rules, uint32_t pegasus_data_version)
-        : rules(std::move(rules)), pegasus_data_version(pegasus_data_version)
+    template <typename T>
+    static compaction_operation *create(const std::string &params, uint32_t data_version)
+    {
+        return T::creator(params, data_version);
+    }
+
+    template <typename T>
+    static void register_component(const char *name)
+    {
+        dsn::utils::factory_store<compaction_operation>::register_factory(
+            name, create<T>, dsn::PROVIDER_TYPE_MAIN);
+    }
+
+    compaction_operation(filter_rules &&rules, uint32_t data_version)
+        : rules(std::move(rules)), data_version(data_version)
     {
     }
+    explicit compaction_operation(uint32_t data_version) : data_version(data_version) {}
     virtual ~compaction_operation() = 0;
 
     bool all_rules_match(const std::string &hash_key,
                          const std::string &sort_key,
                          const rocksdb::Slice &existing_value) const;
+    void set_rules(filter_rules &&rules);
     /**
      * @return false indicates that this key-value should be removed
      * If you want to modify the existing_value, you can pass it back through new_value and
@@ -54,13 +81,19 @@ public:
 
 protected:
     filter_rules rules;
-    uint32_t pegasus_data_version;
+    uint32_t data_version;
 };
 
 class delete_key : public compaction_operation
 {
 public:
-    delete_key(filter_rules &&rules, uint32_t pegasus_data_version);
+    static compaction_operation *creator(const std::string &params, uint32_t data_version)
+    {
+        return new delete_key(data_version);
+    }
+
+    delete_key(filter_rules &&rules, uint32_t data_version);
+    explicit delete_key(uint32_t data_version);
 
     bool filter(const std::string &hash_key,
                 const std::string &sort_key,
@@ -71,6 +104,7 @@ public:
 private:
     FRIEND_TEST(delete_key_test, filter);
     FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
+    FRIEND_TEST(compaction_filter_operation_test, create_operations);
 };
 
 enum update_ttl_op_type
@@ -84,23 +118,49 @@ enum update_ttl_op_type
     UTOT_TIMESTAMP,
     UTOT_INVALID,
 };
+ENUM_BEGIN(update_ttl_op_type, UTOT_INVALID)
+ENUM_REG(UTOT_FROM_NOW)
+ENUM_REG(UTOT_FROM_CURRENT)
+ENUM_REG(UTOT_TIMESTAMP)
+ENUM_END(update_ttl_op_type)
+
+ENUM_TYPE_SERIALIZATION(update_ttl_op_type, UTOT_INVALID)
 
 class update_ttl : public compaction_operation
 {
 public:
-    update_ttl(filter_rules &&rules, uint32_t pegasus_data_version);
+    static compaction_operation *creator(const std::string &params, uint32_t data_version)
+    {
+        update_ttl *operation = new update_ttl(data_version);
+        if (!dsn::json::json_forwarder<update_ttl>::decode(
+                dsn::blob::create_from_bytes(params.data(), params.size()), *operation)) {
+            delete operation;
+            return nullptr;
+        }
+        return operation;
+    }
+
+    update_ttl(filter_rules &&rules, uint32_t data_version);
+    explicit update_ttl(uint32_t data_version);
 
     bool filter(const std::string &hash_key,
                 const std::string &sort_key,
                 const rocksdb::Slice &existing_value,
                 std::string *new_value,
                 bool *value_changed) const;
+    DEFINE_JSON_SERIALIZATION(type, value)
 
 private:
     update_ttl_op_type type;
     uint32_t value;
 
     FRIEND_TEST(update_ttl_test, filter);
+    FRIEND_TEST(compaction_filter_operation_test, creator);
+    FRIEND_TEST(compaction_filter_operation_test, create_operations);
 };
+
+typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
+compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
+void register_compaction_operations();
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/main.cpp b/src/server/main.cpp
index d12c0e6..eb3c0a5 100644
--- a/src/server/main.cpp
+++ b/src/server/main.cpp
@@ -21,7 +21,7 @@
 #include "pegasus_service_app.h"
 #include "info_collector_app.h"
 #include "brief_stat.h"
-#include "compaction_filter_rule.h"
+#include "compaction_operation.h"
 
 #include <pegasus/version.h>
 #include <pegasus/git_commit.h>
@@ -88,7 +88,7 @@ void dsn_app_registration_pegasus()
         "server-stat - query selected perf counters",
         "server-stat",
         [](const std::vector<std::string> &args) { return pegasus::get_brief_stat(); });
-    pegasus::server::register_compaction_filter_rules();
+    pegasus::server::register_compaction_operations();
 }
 
 int main(int argc, char **argv)
diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp
index 9303872..8acad74 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -92,13 +92,13 @@ TEST(compaction_filter_operation_test, all_rules_match)
     rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
     rules.push_back(dsn::make_unique<sortkey_pattern_rule>());
     rules.push_back(dsn::make_unique<ttl_range_rule>(data_version));
-    delete_key update_operation(std::move(rules), data_version);
+    delete_key delete_operation(std::move(rules), data_version);
     pegasus_value_generator gen;
     auto now_ts = utils::epoch_now();
     for (const auto &test : tests) {
-        auto hash_rule = static_cast<hashkey_pattern_rule *>(update_operation.rules[0].get());
-        auto sort_rule = static_cast<sortkey_pattern_rule *>(update_operation.rules[1].get());
-        auto ttl_rule = static_cast<ttl_range_rule *>(update_operation.rules[2].get());
+        auto hash_rule = static_cast<hashkey_pattern_rule *>(delete_operation.rules[0].get());
+        auto sort_rule = static_cast<sortkey_pattern_rule *>(delete_operation.rules[1].get());
+        auto ttl_rule = static_cast<ttl_range_rule *>(delete_operation.rules[2].get());
 
         hash_rule->pattern = test.hashkey_pattern;
         hash_rule->match_type = test.hashkey_match_type;
@@ -109,7 +109,7 @@ TEST(compaction_filter_operation_test, all_rules_match)
 
         rocksdb::SliceParts svalue =
             gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
-        ASSERT_EQ(update_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
+        ASSERT_EQ(delete_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
                   test.all_match);
     }
 
@@ -222,5 +222,60 @@ TEST(update_ttl_test, filter)
         }
     }
 }
+
+TEST(compaction_filter_operation_test, creator)
+{
+    uint32_t data_version = 1;
+    std::string params_json = R"({"type":"UTOT_FROM_CURRENT","value":2000})";
+    update_ttl *update_ttl_op =
+        static_cast<update_ttl *>(update_ttl::creator(params_json, data_version));
+    ASSERT_EQ(update_ttl_op->value, 2000);
+    ASSERT_EQ(update_ttl_op->type, UTOT_FROM_CURRENT);
+    delete update_ttl_op;
+
+    // invalid operation
+    params_json = R"({"type_xxx":"UTOT_FROM_CURRENT","value":2000})";
+    compaction_operation *invalid_op = update_ttl::creator(params_json, data_version);
+    ASSERT_EQ(invalid_op, nullptr);
+    params_json = R"({"type":"UTOT_FROM_CURRENT","value_xxx":2000})";
+    invalid_op = update_ttl::creator(params_json, data_version);
+    ASSERT_EQ(invalid_op, nullptr);
+}
+
+TEST(compaction_filter_operation_test, create_operations)
+{
+    std::string json =
+        "{\"ops\":[{\"type\":\"COT_DELETE\",\"params\":\"\",\"rules\":[{\"type\":\"FRT_HASHKEY_"
+        "PATTERN\",\"params\":\"{\\\"pattern\\\":\\\"hashkey\\\",\\\"match_type\\\":\\\"SMT_MATCH_"
+        "PREFIX\\\"}\"}]},{\"type\":\"COT_UPDATE_TTL\",\"params\":\"{\\\"type\\\":\\\"UTOT_FROM_"
+        "NOW\\\",\\\"value\\\":10000}\",\"rules\":[{\"type\":\"FRT_HASHKEY_PATTERN\","
+        "\"params\":\"{\\\"pattern\\\":\\\"hashkey\\\",\\\"match_type\\\":\\\"SMT_MATCH_"
+        "ANYWHERE\\\"}\"},{\"type\":\"FRT_SORTKEY_PATTERN\",\"params\":\"{\\\"pattern\\\":"
+        "\\\"sortkey\\\",\\\"match_type\\\":\\\"SMT_MATCH_POSTFIX\\\"}\"},{\"type\":\"FRT_"
+        "TTL_RANGE\",\"params\":\"{\\\"start_ttl\\\":0,\\\"stop_ttl\\\":2000}\"}]}]"
+        "}";
+    auto operations = create_compaction_operations(json, 1);
+    ASSERT_EQ(operations.size(), 2);
+
+    auto first_operation = static_cast<delete_key *>(operations.begin()->get());
+    ASSERT_EQ(first_operation->rules.size(), 1);
+    auto hash_rule = static_cast<hashkey_pattern_rule *>(first_operation->rules[0].get());
+    ASSERT_EQ(hash_rule->pattern, "hashkey");
+    ASSERT_EQ(hash_rule->match_type, SMT_MATCH_PREFIX);
+
+    auto second_operation = static_cast<update_ttl *>(operations.rbegin()->get());
+    ASSERT_EQ(second_operation->type, UTOT_FROM_NOW);
+    ASSERT_EQ(second_operation->value, 10000);
+    ASSERT_EQ(second_operation->rules.size(), 3);
+    hash_rule = static_cast<hashkey_pattern_rule *>(second_operation->rules[0].get());
+    ASSERT_EQ(hash_rule->pattern, "hashkey");
+    ASSERT_EQ(hash_rule->match_type, SMT_MATCH_ANYWHERE);
+    auto sort_rule = static_cast<sortkey_pattern_rule *>(second_operation->rules[1].get());
+    ASSERT_EQ(sort_rule->pattern, "sortkey");
+    ASSERT_EQ(sort_rule->match_type, SMT_MATCH_POSTFIX);
+    auto expire_ts_rule = static_cast<ttl_range_rule *>(second_operation->rules[2].get());
+    ASSERT_EQ(expire_ts_rule->start_ttl, 0);
+    ASSERT_EQ(expire_ts_rule->stop_ttl, 2000);
+}
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/test/main.cpp b/src/server/test/main.cpp
index 985a3b5..56083e1 100644
--- a/src/server/test/main.cpp
+++ b/src/server/test/main.cpp
@@ -20,7 +20,7 @@
 #include <gtest/gtest.h>
 #include <dsn/service_api_cpp.h>
 #include <dsn/dist/replication/replication_service_app.h>
-#include "server/compaction_filter_rule.h"
+#include "server/compaction_operation.h"
 #include "server/pegasus_server_impl.h"
 
 std::atomic_bool gtest_done{false};
@@ -49,7 +49,7 @@ GTEST_API_ int main(int argc, char **argv)
     dsn::replication::replication_app_base::register_storage_engine(
         "pegasus",
         dsn::replication::replication_app_base::create<pegasus::server::pegasus_server_impl>);
-    pegasus::server::register_compaction_filter_rules();
+    pegasus::server::register_compaction_operations();
 
     dsn_run_config("config.ini", false);
     while (!gtest_done) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org