You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/09/16 10:01:11 UTC

[incubator-pegasus] branch master updated: fix: remove user specified compaction if the corresponding appenv was removed (#814)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d5976e2  fix: remove user specified compaction if the corresponding appenv was removed (#814)
d5976e2 is described below

commit d5976e251490681136558793d5cbaf325bdb5ae3
Author: zhao liwei <zl...@163.com>
AuthorDate: Thu Sep 16 18:01:07 2021 +0800

    fix: remove user specified compaction if the corresponding appenv was removed (#814)
---
 src/server/compaction_filter_rule.cpp           | 27 ++++++++++++------------
 src/server/compaction_filter_rule.h             | 25 +++++++++++-----------
 src/server/compaction_operation.cpp             | 22 +++++++++----------
 src/server/compaction_operation.h               | 24 ++++++++++-----------
 src/server/key_ttl_compaction_filter.h          | 28 +++++++++++++++++--------
 src/server/pegasus_server_impl.cpp              |  8 +++++++
 src/server/test/compaction_filter_rule_test.cpp |  8 +++----
 src/server/test/compaction_operation_test.cpp   | 14 ++++++-------
 8 files changed, 85 insertions(+), 71 deletions(-)

diff --git a/src/server/compaction_filter_rule.cpp b/src/server/compaction_filter_rule.cpp
index e414ba8..d373d9f 100644
--- a/src/server/compaction_filter_rule.cpp
+++ b/src/server/compaction_filter_rule.cpp
@@ -27,9 +27,9 @@
 
 namespace pegasus {
 namespace server {
-bool string_pattern_match(const std::string &value,
+bool string_pattern_match(dsn::string_view value,
                           string_match_type type,
-                          const std::string &filter_pattern)
+                          dsn::string_view filter_pattern)
 {
     if (filter_pattern.empty())
         return false;
@@ -38,7 +38,7 @@ bool string_pattern_match(const std::string &value,
 
     switch (type) {
     case string_match_type::SMT_MATCH_ANYWHERE:
-        return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
+        return value.find(filter_pattern) != dsn::string_view::npos;
     case string_match_type::SMT_MATCH_PREFIX:
         return memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
     case string_match_type::SMT_MATCH_POSTFIX:
@@ -53,30 +53,29 @@ bool string_pattern_match(const std::string &value,
 
 hashkey_pattern_rule::hashkey_pattern_rule(uint32_t data_version) {}
 
-bool hashkey_pattern_rule::match(const std::string &hash_key,
-                                 const std::string &sort_key,
-                                 const rocksdb::Slice &existing_value) const
+bool hashkey_pattern_rule::match(dsn::string_view hash_key,
+                                 dsn::string_view sort_key,
+                                 dsn::string_view existing_value) const
 {
     return string_pattern_match(hash_key, match_type, pattern);
 }
 
 sortkey_pattern_rule::sortkey_pattern_rule(uint32_t data_version) {}
 
-bool sortkey_pattern_rule::match(const std::string &hash_key,
-                                 const std::string &sort_key,
-                                 const rocksdb::Slice &existing_value) const
+bool sortkey_pattern_rule::match(dsn::string_view hash_key,
+                                 dsn::string_view sort_key,
+                                 dsn::string_view existing_value) const
 {
     return string_pattern_match(sort_key, match_type, pattern);
 }
 
 ttl_range_rule::ttl_range_rule(uint32_t data_version) : data_version(data_version) {}
 
-bool ttl_range_rule::match(const std::string &hash_key,
-                           const std::string &sort_key,
-                           const rocksdb::Slice &existing_value) const
+bool ttl_range_rule::match(dsn::string_view hash_key,
+                           dsn::string_view sort_key,
+                           dsn::string_view existing_value) const
 {
-    uint32_t expire_ts =
-        pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
+    uint32_t expire_ts = pegasus_extract_expire_ts(data_version, existing_value);
     // if start_ttl and stop_ttl = 0, it means we want to delete keys which have no ttl
     if (0 == expire_ts && 0 == start_ttl && 0 == stop_ttl) {
         return true;
diff --git a/src/server/compaction_filter_rule.h b/src/server/compaction_filter_rule.h
index f55cdaa..ed038e3 100644
--- a/src/server/compaction_filter_rule.h
+++ b/src/server/compaction_filter_rule.h
@@ -19,7 +19,6 @@
 
 #pragma once
 
-#include <rocksdb/slice.h>
 #include <dsn/utility/enum_helper.h>
 #include <dsn/cpp/json_helper.h>
 #include <gtest/gtest.h>
@@ -69,9 +68,9 @@ public:
 
     // TODO(zhaoliwei): we can use `value_filed` to replace existing_value in the later,
     // after the refactor of value schema
-    virtual bool match(const std::string &hash_key,
-                       const std::string &sort_key,
-                       const rocksdb::Slice &existing_value) const = 0;
+    virtual bool match(dsn::string_view hash_key,
+                       dsn::string_view sort_key,
+                       dsn::string_view existing_value) const = 0;
 };
 
 enum string_match_type
@@ -94,9 +93,9 @@ class hashkey_pattern_rule : public compaction_filter_rule
 public:
     hashkey_pattern_rule(uint32_t data_version = VERSION_MAX);
 
-    bool match(const std::string &hash_key,
-               const std::string &sort_key,
-               const rocksdb::Slice &existing_value) const;
+    bool match(dsn::string_view hash_key,
+               dsn::string_view sort_key,
+               dsn::string_view existing_value) const;
     DEFINE_JSON_SERIALIZATION(pattern, match_type)
 
 private:
@@ -116,9 +115,9 @@ class sortkey_pattern_rule : public compaction_filter_rule
 public:
     sortkey_pattern_rule(uint32_t data_version = VERSION_MAX);
 
-    bool match(const std::string &hash_key,
-               const std::string &sort_key,
-               const rocksdb::Slice &existing_value) const;
+    bool match(dsn::string_view hash_key,
+               dsn::string_view sort_key,
+               dsn::string_view existing_value) const;
     DEFINE_JSON_SERIALIZATION(pattern, match_type)
 
 private:
@@ -136,9 +135,9 @@ class ttl_range_rule : public compaction_filter_rule
 public:
     explicit ttl_range_rule(uint32_t data_version);
 
-    bool match(const std::string &hash_key,
-               const std::string &sort_key,
-               const rocksdb::Slice &existing_value) const;
+    bool match(dsn::string_view hash_key,
+               dsn::string_view sort_key,
+               dsn::string_view existing_value) const;
     DEFINE_JSON_SERIALIZATION(start_ttl, stop_ttl)
 
 private:
diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp
index 4b81040..a1a629e 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -25,9 +25,9 @@ namespace pegasus {
 namespace server {
 compaction_operation::~compaction_operation() = default;
 
-bool compaction_operation::all_rules_match(const std::string &hash_key,
-                                           const std::string &sort_key,
-                                           const rocksdb::Slice &existing_value) const
+bool compaction_operation::all_rules_match(dsn::string_view hash_key,
+                                           dsn::string_view sort_key,
+                                           dsn::string_view existing_value) const
 {
     if (rules.empty()) {
         return false;
@@ -50,9 +50,9 @@ delete_key::delete_key(filter_rules &&rules, uint32_t data_version)
 
 delete_key::delete_key(uint32_t data_version) : compaction_operation(data_version) {}
 
-bool delete_key::filter(const std::string &hash_key,
-                        const std::string &sort_key,
-                        const rocksdb::Slice &existing_value,
+bool delete_key::filter(dsn::string_view hash_key,
+                        dsn::string_view sort_key,
+                        dsn::string_view existing_value,
                         std::string *new_value,
                         bool *value_changed) const
 {
@@ -69,9 +69,9 @@ update_ttl::update_ttl(filter_rules &&rules, uint32_t data_version)
 
 update_ttl::update_ttl(uint32_t data_version) : compaction_operation(data_version) {}
 
-bool update_ttl::filter(const std::string &hash_key,
-                        const std::string &sort_key,
-                        const rocksdb::Slice &existing_value,
+bool update_ttl::filter(dsn::string_view hash_key,
+                        dsn::string_view sort_key,
+                        dsn::string_view existing_value,
                         std::string *new_value,
                         bool *value_changed) const
 {
@@ -85,7 +85,7 @@ bool update_ttl::filter(const std::string &hash_key,
         new_ts = utils::epoch_now() + value;
         break;
     case update_ttl_op_type::UTOT_FROM_CURRENT: {
-        auto ttl = pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
+        auto ttl = pegasus_extract_expire_ts(data_version, existing_value);
         if (ttl == 0) {
             return false;
         }
@@ -101,7 +101,7 @@ bool update_ttl::filter(const std::string &hash_key,
         return false;
     }
 
-    *new_value = existing_value.ToString();
+    *new_value = std::string(existing_value.data(), existing_value.size());
     pegasus_update_expire_ts(data_version, *new_value, new_ts);
     *value_changed = true;
     return false;
diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h
index e20b3b4..d8bedc6 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -64,18 +64,18 @@ public:
     explicit compaction_operation(uint32_t data_version) : data_version(data_version) {}
     virtual ~compaction_operation() = 0;
 
-    bool all_rules_match(const std::string &hash_key,
-                         const std::string &sort_key,
-                         const rocksdb::Slice &existing_value) const;
+    bool all_rules_match(dsn::string_view hash_key,
+                         dsn::string_view sort_key,
+                         dsn::string_view existing_value) const;
     void set_rules(filter_rules &&rules);
     /**
      * @return false indicates that this key-value should be removed
      * If you want to modify the existing_value, you can pass it back through new_value and
      * value_changed needs to be set to true in this case.
      */
-    virtual bool filter(const std::string &hash_key,
-                        const std::string &sort_key,
-                        const rocksdb::Slice &existing_value,
+    virtual bool filter(dsn::string_view hash_key,
+                        dsn::string_view sort_key,
+                        dsn::string_view existing_value,
                         std::string *new_value,
                         bool *value_changed) const = 0;
 
@@ -95,9 +95,9 @@ public:
     delete_key(filter_rules &&rules, uint32_t data_version);
     explicit delete_key(uint32_t data_version);
 
-    bool filter(const std::string &hash_key,
-                const std::string &sort_key,
-                const rocksdb::Slice &existing_value,
+    bool filter(dsn::string_view hash_key,
+                dsn::string_view sort_key,
+                dsn::string_view existing_value,
                 std::string *new_value,
                 bool *value_changed) const;
 
@@ -143,9 +143,9 @@ public:
     update_ttl(filter_rules &&rules, uint32_t data_version);
     explicit update_ttl(uint32_t data_version);
 
-    bool filter(const std::string &hash_key,
-                const std::string &sort_key,
-                const rocksdb::Slice &existing_value,
+    bool filter(dsn::string_view hash_key,
+                dsn::string_view sort_key,
+                dsn::string_view existing_value,
                 std::string *new_value,
                 bool *value_changed) const;
     DEFINE_JSON_SERIALIZATION(type, value)
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index e3f94a3..d7448a5 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -68,26 +68,31 @@ public:
             return false;
         }
 
-        if (!_user_specified_operations.empty() &&
-            user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
-            return true;
-        }
-
         uint32_t expire_ts =
             pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
         if (_default_ttl != 0 && expire_ts == 0) {
             // should update ttl
+            expire_ts = utils::epoch_now() + _default_ttl;
             *new_value = existing_value.ToString();
-            pegasus_update_expire_ts(
-                _pegasus_data_version, *new_value, utils::epoch_now() + _default_ttl);
+            pegasus_update_expire_ts(_pegasus_data_version, *new_value, expire_ts);
             *value_changed = true;
-            return false;
         }
+
+        if (!_user_specified_operations.empty()) {
+            dsn::string_view value_view = utils::to_string_view(existing_value);
+            if (*value_changed) {
+                value_view = *new_value;
+            }
+            if (user_specified_operation_filter(key, value_view, new_value, value_changed)) {
+                return true;
+            }
+        }
+
         return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
     }
 
     bool user_specified_operation_filter(const rocksdb::Slice &key,
-                                         const rocksdb::Slice &existing_value,
+                                         dsn::string_view existing_value,
                                          std::string *new_value,
                                          bool *value_changed) const
     {
@@ -178,6 +183,11 @@ public:
             _user_specified_operations.swap(operations);
         }
     }
+    void clear_user_specified_ops()
+    {
+        dsn::utils::auto_write_lock l(_lock);
+        _user_specified_operations.clear();
+    }
 
 private:
     std::atomic<uint32_t> _pegasus_data_version;
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 4e8cf69..9375bbe 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2611,9 +2611,17 @@ void pegasus_server_impl::update_user_specified_compaction(
     const std::map<std::string, std::string> &envs)
 {
     auto iter = envs.find(USER_SPECIFIED_COMPACTION);
+    if (dsn_unlikely(iter == envs.end() && _user_specified_compaction != "")) {
+        ddebug_replica("clear user specified compaction coz it was deleted");
+        _key_ttl_compaction_filter_factory->clear_user_specified_ops();
+        _user_specified_compaction = "";
+        return;
+    }
     if (dsn_unlikely(iter != envs.end() && iter->second != _user_specified_compaction)) {
+        ddebug_replica("update user specified compaction coz it was changed");
         _key_ttl_compaction_filter_factory->extract_user_specified_ops(iter->second);
         _user_specified_compaction = iter->second;
+        return;
     }
 }
 
diff --git a/src/server/test/compaction_filter_rule_test.cpp b/src/server/test/compaction_filter_rule_test.cpp
index 40dc6c2..3109012 100644
--- a/src/server/test/compaction_filter_rule_test.cpp
+++ b/src/server/test/compaction_filter_rule_test.cpp
@@ -52,12 +52,11 @@ TEST(hashkey_pattern_rule_test, match)
         {"hashkey", "hashkey", SMT_INVALID, false},
     };
 
-    rocksdb::Slice slice;
     hashkey_pattern_rule rule;
     for (const auto &test : tests) {
         rule.match_type = test.match_type;
         rule.pattern = test.pattern;
-        ASSERT_EQ(rule.match(test.hashkey, "", slice), test.match);
+        ASSERT_EQ(rule.match(test.hashkey, "", ""), test.match);
     }
 }
 
@@ -88,12 +87,11 @@ TEST(sortkey_pattern_rule_test, match)
         {"sortkey", "sortkey", SMT_INVALID, false},
     };
 
-    rocksdb::Slice slice;
     sortkey_pattern_rule rule;
     for (const auto &test : tests) {
         rule.match_type = test.match_type;
         rule.pattern = test.pattern;
-        ASSERT_EQ(rule.match("", test.sortkey, slice), test.match);
+        ASSERT_EQ(rule.match("", test.sortkey, ""), test.match);
     }
 }
 
@@ -128,7 +126,7 @@ TEST(ttl_range_rule_test, match)
         rule.stop_ttl = test.stop_ttl;
         rocksdb::SliceParts svalue =
             gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
-        ASSERT_EQ(rule.match("", "", svalue.parts[0]), test.match);
+        ASSERT_EQ(rule.match("", "", svalue.parts[0].ToString()), test.match);
     }
 }
 
diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp
index f4abcb8..c8ea5d3 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -109,13 +109,14 @@ TEST(compaction_filter_operation_test, all_rules_match)
 
         rocksdb::SliceParts svalue =
             gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
-        ASSERT_EQ(delete_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
+        ASSERT_EQ(delete_operation.all_rules_match(
+                      test.hashkey, test.sortkey, svalue.parts[0].ToString()),
                   test.all_match);
     }
 
     // all_rules_match will return false if there is no rule in this operation
     update_ttl no_rule_operation({}, data_version);
-    ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", rocksdb::Slice()), false);
+    ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", ""), false);
 }
 
 TEST(delete_key_test, filter)
@@ -140,8 +141,7 @@ TEST(delete_key_test, filter)
         auto hash_rule = static_cast<hashkey_pattern_rule *>(delete_operation.rules.begin()->get());
         hash_rule->pattern = test.hashkey_pattern;
         hash_rule->match_type = test.hashkey_match_type;
-        ASSERT_EQ(test.filter,
-                  delete_operation.filter(test.hashkey, "", rocksdb::Slice(), nullptr, nullptr));
+        ASSERT_EQ(test.filter, delete_operation.filter(test.hashkey, "", "", nullptr, nullptr));
     }
 }
 
@@ -197,9 +197,9 @@ TEST(update_ttl_test, filter)
         bool value_changed = false;
         rocksdb::SliceParts svalue = gen.generate_value(data_version, "", test.expire_ts, 0);
         uint32_t before_ts = utils::epoch_now();
-        ASSERT_EQ(
-            false,
-            update_operation.filter(test.hashkey, "", svalue.parts[0], &new_value, &value_changed));
+        ASSERT_EQ(false,
+                  update_operation.filter(
+                      test.hashkey, "", svalue.parts[0].ToString(), &new_value, &value_changed));
         ASSERT_EQ(test.value_changed, value_changed);
         if (value_changed) {
             uint32_t new_ts = pegasus_extract_expire_ts(data_version, new_value);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org