You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2020/09/02 02:00:17 UTC

[incubator-pegasus] branch v2.1 updated (339ef34 -> 41d9015)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a change to branch v2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git.


    from 339ef34  feat(bulk-load): add bulk load function test (#568)
     new eafd125  chore: add disclaimer for not compliant with asf policy (#589)
     new 41d9015  fix: Load options from file when open an exist DB (#587)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 DISCLAIMER-WIP                               | 19 ++++++
 NOTICE                                       |  5 ++
 rdsn                                         |  2 +-
 src/server/meta_store.cpp                    | 58 +++++++++++++++---
 src/server/meta_store.h                      | 12 ++++
 src/server/pegasus_server_impl.cpp           | 90 ++++++++++++++++++++--------
 src/server/pegasus_server_impl.h             |  7 ++-
 src/server/test/pegasus_server_impl_test.cpp | 37 +++++++++++-
 8 files changed, 193 insertions(+), 37 deletions(-)
 create mode 100644 DISCLAIMER-WIP
 create mode 100644 NOTICE


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org


[incubator-pegasus] 02/02: fix: Load options from file when open an exist DB (#587)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch v2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 41d9015c0e1c027773fec4c80685047ad6cc629c
Author: Zhang Yifan <ch...@163.com>
AuthorDate: Mon Aug 31 11:58:57 2020 +0800

    fix: Load options from file when open an exist DB (#587)
---
 rdsn                                         |  2 +-
 src/server/meta_store.cpp                    | 58 +++++++++++++++---
 src/server/meta_store.h                      | 12 ++++
 src/server/pegasus_server_impl.cpp           | 90 ++++++++++++++++++++--------
 src/server/pegasus_server_impl.h             |  7 ++-
 src/server/test/pegasus_server_impl_test.cpp | 37 +++++++++++-
 6 files changed, 169 insertions(+), 37 deletions(-)

diff --git a/rdsn b/rdsn
index 40d86f7..2158aeb 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 40d86f7f7aa51ba874d9cc91cb7e267f18b2bd11
+Subproject commit 2158aeb5022a9881c18bbad3addd02c68eb93323
diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp
index e1562c4..fee4e51 100644
--- a/src/server/meta_store.cpp
+++ b/src/server/meta_store.cpp
@@ -58,6 +58,19 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
     return last_flushed_decree;
 }
 
+std::string meta_store::get_usage_scenario() const
+{
+    // If couldn't find rocksdb usage scenario in meta column family, return normal in default.
+    std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
+    auto ec = get_string_value_from_meta_cf(false, ROCKSDB_ENV_USAGE_SCENARIO_KEY, &usage_scenario);
+    dassert_replica(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND,
+                    "rocksdb {} get {} from meta column family failed: {}",
+                    _db->GetName(),
+                    ROCKSDB_ENV_USAGE_SCENARIO_KEY,
+                    ec.to_string());
+    return usage_scenario;
+}
+
 ::dsn::error_code meta_store::get_value_from_meta_cf(bool read_flushed_data,
                                                      const std::string &key,
                                                      uint64_t *value) const
@@ -72,18 +85,37 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
                                                      uint64_t *value)
 {
     std::string data;
+    auto ec = get_string_value_from_meta_cf(db, cf, read_flushed_data, key, &data);
+    if (ec != ::dsn::ERR_OK) {
+        return ec;
+    }
+    dassert_f(dsn::buf2uint64(data, *value),
+              "rocksdb {} get \"{}\" from meta column family failed to parse into uint64",
+              db->GetName(),
+              data);
+    return ::dsn::ERR_OK;
+}
+
+::dsn::error_code meta_store::get_string_value_from_meta_cf(bool read_flushed_data,
+                                                            const std::string &key,
+                                                            std::string *value) const
+{
+    return get_string_value_from_meta_cf(_db, _meta_cf, read_flushed_data, key, value);
+}
+
+::dsn::error_code meta_store::get_string_value_from_meta_cf(rocksdb::DB *db,
+                                                            rocksdb::ColumnFamilyHandle *cf,
+                                                            bool read_flushed_data,
+                                                            const std::string &key,
+                                                            std::string *value)
+{
     rocksdb::ReadOptions rd_opts;
     if (read_flushed_data) {
         // only read 'flushed' data, mainly to read 'last_flushed_decree'
         rd_opts.read_tier = rocksdb::kPersistedTier;
     }
-    auto status = db->Get(rd_opts, cf, key, &data);
+    auto status = db->Get(rd_opts, cf, key, value);
     if (status.ok()) {
-        dassert_f(dsn::buf2uint64(data, *value),
-                  "rocksdb {} get {} from meta column family got error value {}",
-                  db->GetName(),
-                  key,
-                  data);
         return ::dsn::ERR_OK;
     }
 
@@ -97,7 +129,13 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
 
 ::dsn::error_code meta_store::set_value_to_meta_cf(const std::string &key, uint64_t value) const
 {
-    auto status = _db->Put(_wt_opts, _meta_cf, key, std::to_string(value));
+    return set_string_value_to_meta_cf(key, std::to_string(value));
+}
+
+::dsn::error_code meta_store::set_string_value_to_meta_cf(const std::string &key,
+                                                          const std::string &value) const
+{
+    auto status = _db->Put(_wt_opts, _meta_cf, key, value);
     if (!status.ok()) {
         derror_replica(
             "Put {}={} to meta column family failed, status {}", key, value, status.ToString());
@@ -124,5 +162,11 @@ void meta_store::set_last_manual_compact_finish_time(uint64_t last_manual_compac
         set_value_to_meta_cf(LAST_MANUAL_COMPACT_FINISH_TIME, last_manual_compact_finish_time));
 }
 
+void meta_store::set_usage_scenario(const std::string &usage_scenario) const
+{
+    dcheck_eq_replica(::dsn::ERR_OK,
+                      set_string_value_to_meta_cf(ROCKSDB_ENV_USAGE_SCENARIO_KEY, usage_scenario));
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/meta_store.h b/src/server/meta_store.h
index 322971e..6e7a7b8 100644
--- a/src/server/meta_store.h
+++ b/src/server/meta_store.h
@@ -30,21 +30,33 @@ public:
                                          rocksdb::ColumnFamilyHandle *meta_cf) const;
     uint32_t get_data_version() const;
     uint64_t get_last_manual_compact_finish_time() const;
+    std::string get_usage_scenario() const;
 
     void set_last_flushed_decree(uint64_t decree) const;
     void set_data_version(uint32_t version) const;
     void set_last_manual_compact_finish_time(uint64_t last_manual_compact_finish_time) const;
+    void set_usage_scenario(const std::string &usage_scenario) const;
 
 private:
     ::dsn::error_code
     get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const;
+    ::dsn::error_code get_string_value_from_meta_cf(bool read_flushed_data,
+                                                    const std::string &key,
+                                                    std::string *value) const;
     ::dsn::error_code set_value_to_meta_cf(const std::string &key, uint64_t value) const;
+    ::dsn::error_code set_string_value_to_meta_cf(const std::string &key,
+                                                  const std::string &value) const;
 
     static ::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db,
                                                     rocksdb::ColumnFamilyHandle *cf,
                                                     bool read_flushed_data,
                                                     const std::string &key,
                                                     uint64_t *value);
+    static ::dsn::error_code get_string_value_from_meta_cf(rocksdb::DB *db,
+                                                           rocksdb::ColumnFamilyHandle *cf,
+                                                           bool read_flushed_data,
+                                                           const std::string &key,
+                                                           std::string *value);
 
     friend class pegasus_write_service;
 
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index b181c44..b61bf31 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -8,6 +8,7 @@
 #include <boost/lexical_cast.hpp>
 #include <rocksdb/convenience.h>
 #include <rocksdb/utilities/checkpoint.h>
+#include <rocksdb/utilities/options_util.h>
 #include <dsn/utility/chrono_literals.h>
 #include <dsn/utility/utils.h>
 #include <dsn/utility/filesystem.h>
@@ -1326,14 +1327,45 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
 
     ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
 
+    // Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
+    // will be used elsewhere.
+    rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
     if (db_exist) {
-        // When DB exist, meta CF must be present.
         bool missing_meta_cf = true;
-        if (check_meta_cf(path, &missing_meta_cf) != ::dsn::ERR_OK) {
-            derror_replica("check meta column family failed");
+        bool missing_data_cf = true;
+        // Load latest options from option file stored in the db directory.
+        rocksdb::DBOptions loaded_db_opt;
+        std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
+        rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
+        // Set `ignore_unknown_options` true for forward compatibility.
+        auto status = rocksdb::LoadLatestOptions(path,
+                                                 rocksdb::Env::Default(),
+                                                 &loaded_db_opt,
+                                                 &loaded_cf_descs,
+                                                 /*ignore_unknown_options=*/true);
+        if (!status.ok()) {
+            derror_replica("load latest option file failed.");
             return ::dsn::ERR_LOCAL_APP_FAILURE;
         }
+        for (int i = 0; i < loaded_cf_descs.size(); ++i) {
+            if (loaded_cf_descs[i].name == META_COLUMN_FAMILY_NAME) {
+                missing_meta_cf = false;
+            } else if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
+                missing_data_cf = false;
+                loaded_data_cf_opts = loaded_cf_descs[i].options;
+            } else {
+                derror_replica("unknown column family name.");
+                return ::dsn::ERR_LOCAL_APP_FAILURE;
+            }
+        }
+        // When DB exists, meta CF and data CF must be present.
         dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
+        dassert_replica(!missing_data_cf, "Missing data column family");
+        // Reset usage scenario related options according to loaded_data_cf_opts.
+        // We don't use `loaded_data_cf_opts` directly because pointer-typed options will only be
+        // initialized with default values when calling 'LoadLatestOptions', see
+        // 'rocksdb/utilities/options_util.h'.
+        reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
     } else {
         // When create new DB, we have to create a new column family to store meta data (meta column
         // family).
@@ -1341,7 +1373,13 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
     }
 
     std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
-        {{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
+        {{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
+    auto s = rocksdb::CheckOptionsCompatibility(
+        path, rocksdb::Env::Default(), _db_opts, column_families, /*ignore_unknown_options=*/true);
+    if (!s.ok() && !s.IsNotFound()) {
+        derror_replica("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
+        return ::dsn::ERR_LOCAL_APP_FAILURE;
+    }
     std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
     auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db);
     if (!status.ok()) {
@@ -1360,6 +1398,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
     if (db_exist) {
         _last_committed_decree = _meta_store->get_last_flushed_decree();
         _pegasus_data_version = _meta_store->get_data_version();
+        _usage_scenario = _meta_store->get_usage_scenario();
         uint64_t last_manual_compact_finish_time =
             _meta_store->get_last_manual_compact_finish_time();
         if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
@@ -1407,8 +1446,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
 
     _is_open = true;
 
-    // set default usage scenario after db opened.
-    set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
+    if (!db_exist) {
+        // When create a new db, update usage scenario according to app envs.
+        update_usage_scenario(envs);
+    }
 
     dinfo_replica("start the update rocksdb statistics timer task");
     _update_replica_rdb_stat =
@@ -2472,6 +2513,7 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
         return false;
     }
     if (set_options(new_options)) {
+        _meta_store->set_usage_scenario(usage_scenario);
         _usage_scenario = usage_scenario;
         ddebug_replica(
             "set usage scenario from \"{}\" to \"{}\" succeed", old_usage_scenario, usage_scenario);
@@ -2483,6 +2525,23 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
     }
 }
 
+void pegasus_server_impl::reset_usage_scenario_options(
+    const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
+{
+    // reset usage scenario related options, refer to options set in 'set_usage_scenario' function.
+    target_opts->level0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger;
+    target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger;
+    target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger;
+    target_opts->soft_pending_compaction_bytes_limit =
+        base_opts.soft_pending_compaction_bytes_limit;
+    target_opts->hard_pending_compaction_bytes_limit =
+        base_opts.hard_pending_compaction_bytes_limit;
+    target_opts->disable_auto_compactions = base_opts.disable_auto_compactions;
+    target_opts->max_compaction_bytes = base_opts.max_compaction_bytes;
+    target_opts->write_buffer_size = base_opts.write_buffer_size;
+    target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
+}
+
 bool pegasus_server_impl::set_options(
     const std::unordered_map<std::string, std::string> &new_options)
 {
@@ -2607,25 +2666,6 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
     // TODO(heyuchen): set filter _partition_version in further pr
 }
 
-::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, bool *missing_meta_cf)
-{
-    *missing_meta_cf = true;
-    std::vector<std::string> column_families;
-    auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families);
-    if (!s.ok()) {
-        derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", s.ToString());
-        return ::dsn::ERR_LOCAL_APP_FAILURE;
-    }
-
-    for (const auto &column_family : column_families) {
-        if (column_family == META_COLUMN_FAMILY_NAME) {
-            *missing_meta_cf = false;
-            break;
-        }
-    }
-    return ::dsn::ERR_OK;
-}
-
 ::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
 {
     rocksdb::FlushOptions options;
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 2ce6015..7165f76 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -166,6 +166,8 @@ private:
     friend class pegasus_compression_options_test;
     friend class pegasus_server_impl_test;
     FRIEND_TEST(pegasus_server_impl_test, default_data_version);
+    FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
+    FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
 
     friend class pegasus_manual_compact_service;
     friend class pegasus_write_service;
@@ -260,6 +262,9 @@ private:
     // return true if successfully changed
     bool set_usage_scenario(const std::string &usage_scenario);
 
+    void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
+                                      rocksdb::ColumnFamilyOptions *target_opts);
+
     // return true if successfully set
     bool set_options(const std::unordered_map<std::string, std::string> &new_options);
 
@@ -305,8 +310,6 @@ private:
         return false;
     }
 
-    ::dsn::error_code check_meta_cf(const std::string &path, bool *missing_meta_cf);
-
     void release_db();
     void release_db(rocksdb::DB *db, const std::vector<rocksdb::ColumnFamilyHandle *> &handles);
 
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index c8c16b4..d9d161a 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -11,7 +11,7 @@ namespace server {
 class pegasus_server_impl_test : public pegasus_server_test_base
 {
 public:
-    pegasus_server_impl_test() : pegasus_server_test_base() { start(); }
+    pegasus_server_impl_test() : pegasus_server_test_base() {}
 
     void test_table_level_slow_query()
     {
@@ -59,12 +59,45 @@ public:
     }
 };
 
-TEST_F(pegasus_server_impl_test, test_table_level_slow_query) { test_table_level_slow_query(); }
+TEST_F(pegasus_server_impl_test, test_table_level_slow_query)
+{
+    start();
+    test_table_level_slow_query();
+}
 
 TEST_F(pegasus_server_impl_test, default_data_version)
 {
+    start();
     ASSERT_EQ(_server->_pegasus_data_version, 1);
 }
 
+TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
+{
+    // open a new db with no app env.
+    start();
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario);
+    // set bulk_load scenario for the db.
+    ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+    rocksdb::Options opts = _server->_db->GetOptions();
+    ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
+    ASSERT_EQ(true, opts.disable_auto_compactions);
+    // reopen the db.
+    _server->stop(false);
+    start();
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+    ASSERT_EQ(opts.level0_file_num_compaction_trigger,
+              _server->_db->GetOptions().level0_file_num_compaction_trigger);
+    ASSERT_EQ(opts.disable_auto_compactions, _server->_db->GetOptions().disable_auto_compactions);
+}
+
+TEST_F(pegasus_server_impl_test, test_open_db_with_app_envs)
+{
+    std::map<std::string, std::string> envs;
+    envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
+    start(envs);
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+}
+
 } // namespace server
 } // namespace pegasus


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org


[incubator-pegasus] 01/02: chore: add disclaimer for not compliant with asf policy (#589)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch v2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit eafd1254392ba7adf6a85bdd2bb4f7f1179a7a61
Author: Wu Tao <wu...@163.com>
AuthorDate: Thu Aug 27 17:55:56 2020 +0800

    chore: add disclaimer for not compliant with asf policy (#589)
---
 DISCLAIMER-WIP | 19 +++++++++++++++++++
 NOTICE         |  5 +++++
 2 files changed, 24 insertions(+)

diff --git a/DISCLAIMER-WIP b/DISCLAIMER-WIP
new file mode 100644
index 0000000..9250315
--- /dev/null
+++ b/DISCLAIMER-WIP
@@ -0,0 +1,19 @@
+Apache Pegasus is an effort undergoing incubation at The Apache Software Foundation (ASF), 
+sponsored by the Apache Incubator. Incubation is required of all newly accepted projects 
+until a further review indicates that the infrastructure, communications, and decision 
+making process have stabilized in a manner consistent with other successful ASF projects. 
+While incubation status is not necessarily a reflection of the completeness or stability 
+of the code, it does indicate that the project has yet to be fully endorsed by the ASF.
+
+Some of the incubating project's releases may not be fully compliant with ASF policy. For 
+example, releases may have incomplete or un-reviewed licensing conditions. What follows is 
+a list of known issues the project is currently aware of (note that this list, by definition, 
+is likely to be incomplete): 
+
+ * Releases may have incomplete licensing conditions.
+ * Most of the top contributors have signed an ICLA and we are working on updating the headers.
+
+If you are planning to incorporate this work into your product/project, please be aware that
+you will need to conduct a thorough licensing review to determine the overall implications of 
+including this work. For the current status of this project through the Apache Incubator 
+visit: https://incubator.apache.org/projects/pegasus.html
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..80a4f9a
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache Pegasus
+Copyright 2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org