You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2021/07/02 09:38:14 UTC
[incubator-pegasus] branch master updated: feat: add app env for
user specified compaction (#776)
This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 624071b feat: add app env for user specified compaction (#776)
624071b is described below
commit 624071bf470d83dece4929d13c2d7738b511455f
Author: zhao liwei <zl...@163.com>
AuthorDate: Fri Jul 2 17:38:05 2021 +0800
feat: add app env for user specified compaction (#776)
---
rdsn | 2 +-
src/base/pegasus_const.cpp | 3 +++
src/base/pegasus_const.h | 2 ++
src/server/key_ttl_compaction_filter.h | 13 +++++++++++++
src/server/pegasus_server_impl.cpp | 12 ++++++++++++
src/server/pegasus_server_impl.h | 4 ++++
src/server/test/compaction_operation_test.cpp | 4 ++++
src/server/test/pegasus_server_impl_test.cpp | 13 +++++++++++++
8 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/rdsn b/rdsn
index 76577b2..fc224a4 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 76577b2a40e7a94bb4f7e4a0ab567ddef4700aad
+Subproject commit fc224a425058b6cb7972d72541df7ed2456cb6ca
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index da529b8..d78e7b7 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -91,4 +91,7 @@ const std::string
/// true means compaction and scan will validate partition_hash, otherwise false
const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
+
+/// json string which represents user specified compaction
+const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
} // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index 4f131eb..217171a 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -64,4 +64,6 @@ extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;
extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;
extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
+
+extern const std::string USER_SPECIFIED_COMPACTION;
} // namespace pegasus
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index 4f26844..c049e83 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -27,6 +27,7 @@
#include "base/pegasus_utils.h"
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
+#include "compaction_operation.h"
namespace pegasus {
namespace server {
@@ -132,6 +133,14 @@ public:
{
_partition_version.store(partition_version, std::memory_order_release);
}
+ void extract_user_specified_ops(const std::string &env)
+ {
+ auto operations = create_compaction_operations(env, _pegasus_data_version.load());
+ {
+ dsn::utils::auto_write_lock l(_lock);
+ _user_specified_operations.swap(operations);
+ }
+ }
private:
std::atomic<uint32_t> _pegasus_data_version;
@@ -140,6 +149,10 @@ private:
std::atomic<int32_t> _partition_index{0};
std::atomic<int32_t> _partition_version{-1};
std::atomic_bool _validate_partition_hash{false};
+
+ dsn::utils::rw_lock_nr _lock; // [
+ compaction_operations _user_specified_operations;
+ // ]
};
} // namespace server
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index ad32583..81134b4 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2341,6 +2341,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
+ update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}
@@ -2358,6 +2359,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
+ update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}
@@ -2505,6 +2507,16 @@ void pegasus_server_impl::update_validate_partition_hash(
}
}
+void pegasus_server_impl::update_user_specified_compaction(
+ const std::map<std::string, std::string> &envs)
+{
+ auto iter = envs.find(USER_SPECIFIED_COMPACTION);
+ if (dsn_unlikely(iter != envs.end() && iter->second != _user_specified_compaction)) {
+ _key_ttl_compaction_filter_factory->extract_user_specified_ops(iter->second);
+ _user_specified_compaction = iter->second;
+ }
+}
+
bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
{
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 4a26577..7d23fb8 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -196,6 +196,7 @@ private:
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
FRIEND_TEST(pegasus_server_impl_test, test_stop_db_twice);
+ FRIEND_TEST(pegasus_server_impl_test, test_update_user_specified_compaction);
friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
@@ -269,6 +270,8 @@ private:
void update_validate_partition_hash(const std::map<std::string, std::string> &envs);
+ void update_user_specified_compaction(const std::map<std::string, std::string> &envs);
+
// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
@@ -376,6 +379,7 @@ private:
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
+ std::string _user_specified_compaction;
rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_data_cf;
diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp
index 8acad74..f4abcb8 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -276,6 +276,10 @@ TEST(compaction_filter_operation_test, create_operations)
auto expire_ts_rule = static_cast<ttl_range_rule *>(second_operation->rules[2].get());
ASSERT_EQ(expire_ts_rule->start_ttl, 0);
ASSERT_EQ(expire_ts_rule->stop_ttl, 2000);
+
+ json = "";
+ operations = create_compaction_operations(json, 1);
+ ASSERT_EQ(operations.size(), 0);
}
} // namespace server
} // namespace pegasus
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index df2c515..0f3fab7 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -130,5 +130,18 @@ TEST_F(pegasus_server_impl_test, test_stop_db_twice)
ASSERT_TRUE(_server->_db == nullptr);
}
+TEST_F(pegasus_server_impl_test, test_update_user_specified_compaction)
+{
+ _server->_user_specified_compaction = "";
+ std::map<std::string, std::string> envs;
+
+ _server->update_user_specified_compaction(envs);
+ ASSERT_EQ("", _server->_user_specified_compaction);
+
+ std::string user_specified_compaction = "test";
+ envs[USER_SPECIFIED_COMPACTION] = user_specified_compaction;
+ _server->update_user_specified_compaction(envs);
+ ASSERT_EQ(user_specified_compaction, _server->_user_specified_compaction);
+}
} // namespace server
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org