You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2021/07/02 09:38:14 UTC

[incubator-pegasus] branch master updated: feat: add app env for user specified compaction (#776)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 624071b  feat: add app env for user specified compaction (#776)
624071b is described below

commit 624071bf470d83dece4929d13c2d7738b511455f
Author: zhao liwei <zl...@163.com>
AuthorDate: Fri Jul 2 17:38:05 2021 +0800

    feat: add app env for user specified compaction (#776)
---
 rdsn                                          |  2 +-
 src/base/pegasus_const.cpp                    |  3 +++
 src/base/pegasus_const.h                      |  2 ++
 src/server/key_ttl_compaction_filter.h        | 13 +++++++++++++
 src/server/pegasus_server_impl.cpp            | 12 ++++++++++++
 src/server/pegasus_server_impl.h              |  4 ++++
 src/server/test/compaction_operation_test.cpp |  4 ++++
 src/server/test/pegasus_server_impl_test.cpp  | 13 +++++++++++++
 8 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/rdsn b/rdsn
index 76577b2..fc224a4 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 76577b2a40e7a94bb4f7e4a0ab567ddef4700aad
+Subproject commit fc224a425058b6cb7972d72541df7ed2456cb6ca
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index da529b8..d78e7b7 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -91,4 +91,7 @@ const std::string
 
 /// true means compaction and scan will validate partition_hash, otherwise false
 const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
+
+/// json string which represents user specified compaction
+const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
 } // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index 4f131eb..217171a 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -64,4 +64,6 @@ extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;
 extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;
 
 extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
+
+extern const std::string USER_SPECIFIED_COMPACTION;
 } // namespace pegasus
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index 4f26844..c049e83 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -27,6 +27,7 @@
 #include "base/pegasus_utils.h"
 #include "base/pegasus_key_schema.h"
 #include "base/pegasus_value_schema.h"
+#include "compaction_operation.h"
 
 namespace pegasus {
 namespace server {
@@ -132,6 +133,14 @@ public:
     {
         _partition_version.store(partition_version, std::memory_order_release);
     }
+    void extract_user_specified_ops(const std::string &env)
+    {
+        auto operations = create_compaction_operations(env, _pegasus_data_version.load());
+        {
+            dsn::utils::auto_write_lock l(_lock);
+            _user_specified_operations.swap(operations);
+        }
+    }
 
 private:
     std::atomic<uint32_t> _pegasus_data_version;
@@ -140,6 +149,10 @@ private:
     std::atomic<int32_t> _partition_index{0};
     std::atomic<int32_t> _partition_version{-1};
     std::atomic_bool _validate_partition_hash{false};
+
+    dsn::utils::rw_lock_nr _lock; // [
+    compaction_operations _user_specified_operations;
+    // ]
 };
 
 } // namespace server
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index ad32583..81134b4 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2341,6 +2341,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
     update_slow_query_threshold(envs);
     update_rocksdb_iteration_threshold(envs);
     update_validate_partition_hash(envs);
+    update_user_specified_compaction(envs);
     _manual_compact_svc.start_manual_compact_if_needed(envs);
 }
 
@@ -2358,6 +2359,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
     update_slow_query_threshold(envs);
     update_rocksdb_iteration_threshold(envs);
     update_validate_partition_hash(envs);
+    update_user_specified_compaction(envs);
     _manual_compact_svc.start_manual_compact_if_needed(envs);
 }
 
@@ -2505,6 +2507,16 @@ void pegasus_server_impl::update_validate_partition_hash(
     }
 }
 
+void pegasus_server_impl::update_user_specified_compaction(
+    const std::map<std::string, std::string> &envs)
+{
+    auto iter = envs.find(USER_SPECIFIED_COMPACTION);
+    if (dsn_unlikely(iter != envs.end() && iter->second != _user_specified_compaction)) {
+        _key_ttl_compaction_filter_factory->extract_user_specified_ops(iter->second);
+        _user_specified_compaction = iter->second;
+    }
+}
+
 bool pegasus_server_impl::parse_compression_types(
     const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
 {
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 4a26577..7d23fb8 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -196,6 +196,7 @@ private:
     FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
     FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
     FRIEND_TEST(pegasus_server_impl_test, test_stop_db_twice);
+    FRIEND_TEST(pegasus_server_impl_test, test_update_user_specified_compaction);
 
     friend class pegasus_manual_compact_service;
     friend class pegasus_write_service;
@@ -269,6 +270,8 @@ private:
 
     void update_validate_partition_hash(const std::map<std::string, std::string> &envs);
 
+    void update_user_specified_compaction(const std::map<std::string, std::string> &envs);
+
     // return true if parse compression types 'config' success, otherwise return false.
     // 'compression_per_level' will not be changed if parse failed.
     bool parse_compression_types(const std::string &config,
@@ -376,6 +379,7 @@ private:
     rocksdb::ColumnFamilyOptions _meta_cf_opts;
     rocksdb::ReadOptions _data_cf_rd_opts;
     std::string _usage_scenario;
+    std::string _user_specified_compaction;
 
     rocksdb::DB *_db;
     rocksdb::ColumnFamilyHandle *_data_cf;
diff --git a/src/server/test/compaction_operation_test.cpp b/src/server/test/compaction_operation_test.cpp
index 8acad74..f4abcb8 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -276,6 +276,10 @@ TEST(compaction_filter_operation_test, create_operations)
     auto expire_ts_rule = static_cast<ttl_range_rule *>(second_operation->rules[2].get());
     ASSERT_EQ(expire_ts_rule->start_ttl, 0);
     ASSERT_EQ(expire_ts_rule->stop_ttl, 2000);
+
+    json = "";
+    operations = create_compaction_operations(json, 1);
+    ASSERT_EQ(operations.size(), 0);
 }
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index df2c515..0f3fab7 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -130,5 +130,18 @@ TEST_F(pegasus_server_impl_test, test_stop_db_twice)
     ASSERT_TRUE(_server->_db == nullptr);
 }
 
+TEST_F(pegasus_server_impl_test, test_update_user_specified_compaction)
+{
+    _server->_user_specified_compaction = "";
+    std::map<std::string, std::string> envs;
+
+    _server->update_user_specified_compaction(envs);
+    ASSERT_EQ("", _server->_user_specified_compaction);
+
+    std::string user_specified_compaction = "test";
+    envs[USER_SPECIFIED_COMPACTION] = user_specified_compaction;
+    _server->update_user_specified_compaction(envs);
+    ASSERT_EQ(user_specified_compaction, _server->_user_specified_compaction);
+}
 } // namespace server
 } // namespace pegasus

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org