You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/09/16 10:01:11 UTC
[incubator-pegasus] branch master updated: fix: remove user
specified compaction if the corresponding appenv was removed (#814)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d5976e2 fix: remove user specified compaction if the corresponding appenv was removed (#814)
d5976e2 is described below
commit d5976e251490681136558793d5cbaf325bdb5ae3
Author: zhao liwei <zl...@163.com>
AuthorDate: Thu Sep 16 18:01:07 2021 +0800
fix: remove user specified compaction if the corresponding appenv was removed (#814)
---
src/server/compaction_filter_rule.cpp | 27 ++++++++++++------------
src/server/compaction_filter_rule.h | 25 +++++++++++-----------
src/server/compaction_operation.cpp | 22 +++++++++----------
src/server/compaction_operation.h | 24 ++++++++++-----------
src/server/key_ttl_compaction_filter.h | 28 +++++++++++++++++--------
src/server/pegasus_server_impl.cpp | 8 +++++++
src/server/test/compaction_filter_rule_test.cpp | 8 +++----
src/server/test/compaction_operation_test.cpp | 14 ++++++-------
8 files changed, 85 insertions(+), 71 deletions(-)
diff --git a/src/server/compaction_filter_rule.cpp b/src/server/compaction_filter_rule.cpp
index e414ba8..d373d9f 100644
--- a/src/server/compaction_filter_rule.cpp
+++ b/src/server/compaction_filter_rule.cpp
@@ -27,9 +27,9 @@
namespace pegasus {
namespace server {
-bool string_pattern_match(const std::string &value,
+bool string_pattern_match(dsn::string_view value,
string_match_type type,
- const std::string &filter_pattern)
+ dsn::string_view filter_pattern)
{
if (filter_pattern.empty())
return false;
@@ -38,7 +38,7 @@ bool string_pattern_match(const std::string &value,
switch (type) {
case string_match_type::SMT_MATCH_ANYWHERE:
- return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
+ return value.find(filter_pattern) != dsn::string_view::npos;
case string_match_type::SMT_MATCH_PREFIX:
return memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
case string_match_type::SMT_MATCH_POSTFIX:
@@ -53,30 +53,29 @@ bool string_pattern_match(const std::string &value,
hashkey_pattern_rule::hashkey_pattern_rule(uint32_t data_version) {}
-bool hashkey_pattern_rule::match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const
+bool hashkey_pattern_rule::match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const
{
return string_pattern_match(hash_key, match_type, pattern);
}
sortkey_pattern_rule::sortkey_pattern_rule(uint32_t data_version) {}
-bool sortkey_pattern_rule::match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const
+bool sortkey_pattern_rule::match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const
{
return string_pattern_match(sort_key, match_type, pattern);
}
ttl_range_rule::ttl_range_rule(uint32_t data_version) : data_version(data_version) {}
-bool ttl_range_rule::match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const
+bool ttl_range_rule::match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const
{
- uint32_t expire_ts =
- pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
+ uint32_t expire_ts = pegasus_extract_expire_ts(data_version, existing_value);
// if start_ttl and stop_ttl = 0, it means we want to delete keys which have no ttl
if (0 == expire_ts && 0 == start_ttl && 0 == stop_ttl) {
return true;
diff --git a/src/server/compaction_filter_rule.h b/src/server/compaction_filter_rule.h
index f55cdaa..ed038e3 100644
--- a/src/server/compaction_filter_rule.h
+++ b/src/server/compaction_filter_rule.h
@@ -19,7 +19,6 @@
#pragma once
-#include <rocksdb/slice.h>
#include <dsn/utility/enum_helper.h>
#include <dsn/cpp/json_helper.h>
#include <gtest/gtest.h>
@@ -69,9 +68,9 @@ public:
// TODO(zhaoliwei): we can use `value_filed` to replace existing_value in the later,
// after the refactor of value schema
- virtual bool match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const = 0;
+ virtual bool match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const = 0;
};
enum string_match_type
@@ -94,9 +93,9 @@ class hashkey_pattern_rule : public compaction_filter_rule
public:
hashkey_pattern_rule(uint32_t data_version = VERSION_MAX);
- bool match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const;
+ bool match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const;
DEFINE_JSON_SERIALIZATION(pattern, match_type)
private:
@@ -116,9 +115,9 @@ class sortkey_pattern_rule : public compaction_filter_rule
public:
sortkey_pattern_rule(uint32_t data_version = VERSION_MAX);
- bool match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const;
+ bool match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const;
DEFINE_JSON_SERIALIZATION(pattern, match_type)
private:
@@ -136,9 +135,9 @@ class ttl_range_rule : public compaction_filter_rule
public:
explicit ttl_range_rule(uint32_t data_version);
- bool match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const;
+ bool match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const;
DEFINE_JSON_SERIALIZATION(start_ttl, stop_ttl)
private:
diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp
index 4b81040..a1a629e 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -25,9 +25,9 @@ namespace pegasus {
namespace server {
compaction_operation::~compaction_operation() = default;
-bool compaction_operation::all_rules_match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const
+bool compaction_operation::all_rules_match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const
{
if (rules.empty()) {
return false;
@@ -50,9 +50,9 @@ delete_key::delete_key(filter_rules &&rules, uint32_t data_version)
delete_key::delete_key(uint32_t data_version) : compaction_operation(data_version) {}
-bool delete_key::filter(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value,
+bool delete_key::filter(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const
{
@@ -69,9 +69,9 @@ update_ttl::update_ttl(filter_rules &&rules, uint32_t data_version)
update_ttl::update_ttl(uint32_t data_version) : compaction_operation(data_version) {}
-bool update_ttl::filter(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value,
+bool update_ttl::filter(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const
{
@@ -85,7 +85,7 @@ bool update_ttl::filter(const std::string &hash_key,
new_ts = utils::epoch_now() + value;
break;
case update_ttl_op_type::UTOT_FROM_CURRENT: {
- auto ttl = pegasus_extract_expire_ts(data_version, utils::to_string_view(existing_value));
+ auto ttl = pegasus_extract_expire_ts(data_version, existing_value);
if (ttl == 0) {
return false;
}
@@ -101,7 +101,7 @@ bool update_ttl::filter(const std::string &hash_key,
return false;
}
- *new_value = existing_value.ToString();
+ *new_value = std::string(existing_value.data(), existing_value.size());
pegasus_update_expire_ts(data_version, *new_value, new_ts);
*value_changed = true;
return false;
diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h
index e20b3b4..d8bedc6 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -64,18 +64,18 @@ public:
explicit compaction_operation(uint32_t data_version) : data_version(data_version) {}
virtual ~compaction_operation() = 0;
- bool all_rules_match(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value) const;
+ bool all_rules_match(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value) const;
void set_rules(filter_rules &&rules);
/**
* @return false indicates that this key-value should be removed
* If you want to modify the existing_value, you can pass it back through new_value and
* value_changed needs to be set to true in this case.
*/
- virtual bool filter(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value,
+ virtual bool filter(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const = 0;
@@ -95,9 +95,9 @@ public:
delete_key(filter_rules &&rules, uint32_t data_version);
explicit delete_key(uint32_t data_version);
- bool filter(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value,
+ bool filter(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const;
@@ -143,9 +143,9 @@ public:
update_ttl(filter_rules &&rules, uint32_t data_version);
explicit update_ttl(uint32_t data_version);
- bool filter(const std::string &hash_key,
- const std::string &sort_key,
- const rocksdb::Slice &existing_value,
+ bool filter(dsn::string_view hash_key,
+ dsn::string_view sort_key,
+ dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const;
DEFINE_JSON_SERIALIZATION(type, value)
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index e3f94a3..d7448a5 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -68,26 +68,31 @@ public:
return false;
}
- if (!_user_specified_operations.empty() &&
- user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
- return true;
- }
-
uint32_t expire_ts =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
// should update ttl
+ expire_ts = utils::epoch_now() + _default_ttl;
*new_value = existing_value.ToString();
- pegasus_update_expire_ts(
- _pegasus_data_version, *new_value, utils::epoch_now() + _default_ttl);
+ pegasus_update_expire_ts(_pegasus_data_version, *new_value, expire_ts);
*value_changed = true;
- return false;
}
+
+ if (!_user_specified_operations.empty()) {
+ dsn::string_view value_view = utils::to_string_view(existing_value);
+ if (*value_changed) {
+ value_view = *new_value;
+ }
+ if (user_specified_operation_filter(key, value_view, new_value, value_changed)) {
+ return true;
+ }
+ }
+
return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}
bool user_specified_operation_filter(const rocksdb::Slice &key,
- const rocksdb::Slice &existing_value,
+ dsn::string_view existing_value,
std::string *new_value,
bool *value_changed) const
{
@@ -178,6 +183,11 @@ public:
_user_specified_operations.swap(operations);
}
}
+ void clear_user_specified_ops()
+ {
+ dsn::utils::auto_write_lock l(_lock);
+ _user_specified_operations.clear();
+ }
private:
std::atomic<uint32_t> _pegasus_data_version;
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 4e8cf69..9375bbe 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2611,9 +2611,17 @@ void pegasus_server_impl::update_user_specified_compaction(
const std::map<std::string, std::string> &envs)
{
auto iter = envs.find(USER_SPECIFIED_COMPACTION);
+ if (dsn_unlikely(iter == envs.end() && _user_specified_compaction != "")) {
+ ddebug_replica("clear user specified compaction coz it was deleted");
+ _key_ttl_compaction_filter_factory->clear_user_specified_ops();
+ _user_specified_compaction = "";
+ return;
+ }
if (dsn_unlikely(iter != envs.end() && iter->second != _user_specified_compaction)) {
+ ddebug_replica("update user specified compaction coz it was changed");
_key_ttl_compaction_filter_factory->extract_user_specified_ops(iter->second);
_user_specified_compaction = iter->second;
+ return;
}
}
diff --git a/src/server/test/compaction_filter_rule_test.cpp b/src/server/test/compaction_filter_rule_test.cpp
index 40dc6c2..3109012 100644
--- a/src/server/test/compaction_filter_rule_test.cpp
+++ b/src/server/test/compaction_filter_rule_test.cpp
@@ -52,12 +52,11 @@ TEST(hashkey_pattern_rule_test, match)
{"hashkey", "hashkey", SMT_INVALID, false},
};
- rocksdb::Slice slice;
hashkey_pattern_rule rule;
for (const auto &test : tests) {
rule.match_type = test.match_type;
rule.pattern = test.pattern;
- ASSERT_EQ(rule.match(test.hashkey, "", slice), test.match);
+ ASSERT_EQ(rule.match(test.hashkey, "", ""), test.match);
}
}
@@ -88,12 +87,11 @@ TEST(sortkey_pattern_rule_test, match)
{"sortkey", "sortkey", SMT_INVALID, false},
};
- rocksdb::Slice slice;
sortkey_pattern_rule rule;
for (const auto &test : tests) {
rule.match_type = test.match_type;
rule.pattern = test.pattern;
- ASSERT_EQ(rule.match("", test.sortkey, slice), test.match);
+ ASSERT_EQ(rule.match("", test.sortkey, ""), test.match);
}
}
@@ -128,7 +126,7 @@ TEST(ttl_range_rule_test, match)
rule.stop_ttl = test.stop_ttl;
rocksdb::SliceParts svalue =
gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
- ASSERT_EQ(rule.match("", "", svalue.parts[0]), test.match);
+ ASSERT_EQ(rule.match("", "", svalue.parts[0].ToString()), test.match);
}
}
diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp
index f4abcb8..c8ea5d3 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -109,13 +109,14 @@ TEST(compaction_filter_operation_test, all_rules_match)
rocksdb::SliceParts svalue =
gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
- ASSERT_EQ(delete_operation.all_rules_match(test.hashkey, test.sortkey, svalue.parts[0]),
+ ASSERT_EQ(delete_operation.all_rules_match(
+ test.hashkey, test.sortkey, svalue.parts[0].ToString()),
test.all_match);
}
// all_rules_match will return false if there is no rule in this operation
update_ttl no_rule_operation({}, data_version);
- ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", rocksdb::Slice()), false);
+ ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", ""), false);
}
TEST(delete_key_test, filter)
@@ -140,8 +141,7 @@ TEST(delete_key_test, filter)
auto hash_rule = static_cast<hashkey_pattern_rule *>(delete_operation.rules.begin()->get());
hash_rule->pattern = test.hashkey_pattern;
hash_rule->match_type = test.hashkey_match_type;
- ASSERT_EQ(test.filter,
- delete_operation.filter(test.hashkey, "", rocksdb::Slice(), nullptr, nullptr));
+ ASSERT_EQ(test.filter, delete_operation.filter(test.hashkey, "", "", nullptr, nullptr));
}
}
@@ -197,9 +197,9 @@ TEST(update_ttl_test, filter)
bool value_changed = false;
rocksdb::SliceParts svalue = gen.generate_value(data_version, "", test.expire_ts, 0);
uint32_t before_ts = utils::epoch_now();
- ASSERT_EQ(
- false,
- update_operation.filter(test.hashkey, "", svalue.parts[0], &new_value, &value_changed));
+ ASSERT_EQ(false,
+ update_operation.filter(
+ test.hashkey, "", svalue.parts[0].ToString(), &new_value, &value_changed));
ASSERT_EQ(test.value_changed, value_changed);
if (value_changed) {
uint32_t new_ts = pegasus_extract_expire_ts(data_version, new_value);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org