You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by tw...@apache.org on 2023/06/07 09:22:55 UTC

[incubator-kvrocks] branch unstable updated: Avoid manually releasing DB pointer via unique_ptr (#1487)

This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new f557255c Avoid manually releasing DB pointer via unique_ptr (#1487)
f557255c is described below

commit f557255cec8d4f43cf63144b712c71026bc9320e
Author: Twice <tw...@gmail.com>
AuthorDate: Wed Jun 7 17:22:48 2023 +0800

    Avoid manually releasing DB pointer via unique_ptr (#1487)
---
 src/common/db_util.h            | 63 +++++++++++++++++++++++++++++++++++++++++
 src/storage/storage.cc          | 54 ++++++++++++++++-------------------
 src/storage/storage.h           |  4 +--
 tests/cppunit/types/set_test.cc |  2 +-
 4 files changed, 90 insertions(+), 33 deletions(-)

diff --git a/src/common/db_util.h b/src/common/db_util.h
index f566449a..e394a9f6 100644
--- a/src/common/db_util.h
+++ b/src/common/db_util.h
@@ -24,6 +24,7 @@
 
 #include "rocksdb/db.h"
 #include "rocksdb/iterator.h"
+#include "rocksdb/utilities/backup_engine.h"
 #include "storage/storage.h"
 
 namespace util {
@@ -39,4 +40,66 @@ struct UniqueIterator : std::unique_ptr<rocksdb::Iterator> {
       : BaseType(storage->NewIterator(options)) {}
 };
 
+namespace details {
+
+template <typename T, auto* F, Status::Code C = Status::NotOK, typename... Args>
+StatusOr<std::unique_ptr<T>> WrapOutPtrToUnique(Args&&... args) {
+  T* ptr = nullptr;
+  auto s = (*F)(std::forward<Args>(args)..., &ptr);
+
+  if (!s.ok()) {
+    return {C, s.ToString()};
+  }
+
+  return ptr;
+}
+
+inline rocksdb::Status DBOpenForReadOnly(const rocksdb::DBOptions& db_options, const std::string& dbname,
+                                         const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+                                         std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
+  return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families, handles, dbptr);
+}
+
+}  // namespace details
+
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(const rocksdb::Options& options, const std::string& dbname) {
+  return details::WrapOutPtrToUnique<
+      rocksdb::DB,
+      static_cast<rocksdb::Status (*)(const rocksdb::Options&, const std::string&, rocksdb::DB**)>(rocksdb::DB::Open),
+      Status::DBOpenErr>(options, dbname);
+}
+
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(
+    const rocksdb::DBOptions& db_options, const std::string& dbname,
+    const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+    std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
+  return details::WrapOutPtrToUnique<
+      rocksdb::DB,
+      static_cast<rocksdb::Status (*)(const rocksdb::DBOptions&, const std::string&,
+                                      const std::vector<rocksdb::ColumnFamilyDescriptor>&,
+                                      std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(rocksdb::DB::Open),
+      Status::DBOpenErr>(db_options, dbname, column_families, handles);
+}
+
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpenForReadOnly(
+    const rocksdb::DBOptions& db_options, const std::string& dbname,
+    const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+    std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
+  return details::WrapOutPtrToUnique<
+      rocksdb::DB,
+      static_cast<rocksdb::Status (*)(
+          const rocksdb::DBOptions&, const std::string&, const std::vector<rocksdb::ColumnFamilyDescriptor>&,
+          std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(details::DBOpenForReadOnly),
+      Status::DBOpenErr>(db_options, dbname, column_families, handles);
+}
+
+inline StatusOr<std::unique_ptr<rocksdb::BackupEngine>> BackupEngineOpen(rocksdb::Env* db_env,
+                                                                         const rocksdb::BackupEngineOptions& options) {
+  return details::WrapOutPtrToUnique<
+      rocksdb::BackupEngine,
+      static_cast<rocksdb::IOStatus (*)(rocksdb::Env*, const rocksdb::BackupEngineOptions&, rocksdb::BackupEngine**)>(
+          rocksdb::BackupEngine::Open),
+      Status::DBBackupErr>(db_env, options);
+}
+
 }  // namespace util
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 9e58d949..65906b6f 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -37,6 +37,7 @@
 #include <random>
 
 #include "compact_filter.h"
+#include "db_util.h"
 #include "event_listener.h"
 #include "event_util.h"
 #include "redis_db.h"
@@ -72,9 +73,8 @@ void Storage::CloseDB() {
 
   db_closing_ = true;
   db_->SyncWAL();
-  rocksdb::CancelAllBackgroundWork(db_, true);
+  rocksdb::CancelAllBackgroundWork(db_.get(), true);
   for (auto handle : cf_handles_) db_->DestroyColumnFamilyHandle(handle);
-  delete db_;
   db_ = nullptr;
 }
 
@@ -200,36 +200,31 @@ Status Storage::SetDBOption(const std::string &key, const std::string &value) {
 }
 
 Status Storage::CreateColumnFamilies(const rocksdb::Options &options) {
-  rocksdb::DB *tmp_db = nullptr;
   rocksdb::ColumnFamilyOptions cf_options(options);
-  rocksdb::Status s = rocksdb::DB::Open(options, config_->db_dir, &tmp_db);
-  if (s.ok()) {
+  auto res = util::DBOpen(options, config_->db_dir);
+  if (res) {
     std::vector<std::string> cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName,
                                          kPropagateColumnFamilyName, kStreamColumnFamilyName};
     std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
-    s = tmp_db->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
+    auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
     if (!s.ok()) {
-      delete tmp_db;
       return {Status::DBOpenErr, s.ToString()};
     }
 
-    for (auto handle : cf_handles) tmp_db->DestroyColumnFamilyHandle(handle);
-    tmp_db->Close();
-    delete tmp_db;
-  }
-
-  if (!s.ok()) {
+    for (auto handle : cf_handles) (*res)->DestroyColumnFamilyHandle(handle);
+    (*res)->Close();
+  } else {
     // We try to create column families by opening the database without column families.
     // If it's ok means we didn't create column families (cannot open without column families if created).
     // When goes wrong, we need to check whether it's caused by column families NOT being opened or not.
     // If the status message contains `Column families not opened` means that we have created the column
     // families, let's ignore the error.
-    std::string not_opened_prefix = "Column families not opened";
-    if (s.IsInvalidArgument() && s.ToString().find(not_opened_prefix) != std::string::npos) {
+    const char *not_opened_prefix = "Column families not opened";
+    if (res.Msg().find(not_opened_prefix) != std::string::npos) {
       return Status::OK();
     }
 
-    return {Status::NotOK, s.ToString()};
+    return res;
   }
 
   return Status::OK();
@@ -314,11 +309,8 @@ Status Storage::Open(bool read_only) {
   if (!s.ok()) return {Status::NotOK, s.ToString()};
 
   auto start = std::chrono::high_resolution_clock::now();
-  if (read_only) {
-    s = rocksdb::DB::OpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_, &db_);
-  } else {
-    s = rocksdb::DB::Open(options, config_->db_dir, column_families, &cf_handles_, &db_);
-  }
+  auto dbs = read_only ? util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_)
+                       : util::DBOpen(options, config_->db_dir, column_families, &cf_handles_);
   auto end = std::chrono::high_resolution_clock::now();
   int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
   if (!s.ok()) {
@@ -326,6 +318,7 @@ Status Storage::Open(bool read_only) {
     return {Status::DBOpenErr, s.ToString()};
   }
 
+  db_ = std::move(*dbs);
   LOG(INFO) << "[storage] Success to load the data from disk: " << duration << " ms";
   return Status::OK();
 }
@@ -341,7 +334,7 @@ Status Storage::CreateBackup() {
 
   // 1) Create checkpoint of rocksdb for backup
   rocksdb::Checkpoint *checkpoint = nullptr;
-  rocksdb::Status s = rocksdb::Checkpoint::Create(db_, &checkpoint);
+  rocksdb::Status s = rocksdb::Checkpoint::Create(db_.get(), &checkpoint);
   if (!s.ok()) {
     LOG(WARNING) << "Failed to create checkpoint object for backup. Error: " << s.ToString();
     return {Status::NotOK, s.ToString()};
@@ -382,7 +375,6 @@ void Storage::DestroyBackup() {
     return;
   }
   backup_->StopBackup();
-  delete backup_;
   backup_ = nullptr;
 }
 
@@ -390,10 +382,11 @@ Status Storage::RestoreFromBackup() {
   // TODO(@ruoshan): assert role to be slave
   // We must reopen the backup engine every time, as the files is changed
   rocksdb::BackupEngineOptions bk_option(config_->backup_sync_dir);
-  auto s = rocksdb::BackupEngine::Open(db_->GetEnv(), bk_option, &backup_);
-  if (!s.ok()) return {Status::DBBackupErr, s.ToString()};
+  auto bes = util::BackupEngineOpen(db_->GetEnv(), bk_option);
+  if (!bes) return bes;
+  backup_ = std::move(*bes);
 
-  s = backup_->RestoreDBFromLatestBackup(config_->db_dir, config_->db_dir);
+  auto s = backup_->RestoreDBFromLatestBackup(config_->db_dir, config_->db_dir);
   if (!s.ok()) {
     LOG(ERROR) << "[storage] Failed to restore database from the latest backup. Error: " << s.ToString();
   } else {
@@ -520,7 +513,7 @@ rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb:
 rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
                              const rocksdb::Slice &key, std::string *value) {
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
-    return txn_write_batch_->GetFromBatchAndDB(db_, options, column_family, key, value);
+    return txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
   }
   return db_->Get(options, column_family, key, value);
 }
@@ -542,7 +535,8 @@ void Storage::MultiGet(const rocksdb::ReadOptions &options, rocksdb::ColumnFamil
                        const size_t num_keys, const rocksdb::Slice *keys, rocksdb::PinnableSlice *values,
                        rocksdb::Status *statuses) {
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
-    txn_write_batch_->MultiGetFromBatchAndDB(db_, options, column_family, num_keys, keys, values, statuses, false);
+    txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
+                                             false);
   } else {
     db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
   }
@@ -701,7 +695,7 @@ void Storage::SetIORateLimit(int64_t max_io_mb) {
   rate_limiter_->SetBytesPerSecond(max_io_mb * static_cast<int64_t>(MiB));
 }
 
-rocksdb::DB *Storage::GetDB() { return db_; }
+rocksdb::DB *Storage::GetDB() { return db_.get(); }
 
 Status Storage::BeginTxn() {
   if (is_txn_mode_) {
@@ -834,7 +828,7 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
   // Create checkpoint if not exist
   if (!storage->env_->FileExists(data_files_dir).ok()) {
     rocksdb::Checkpoint *checkpoint = nullptr;
-    rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_, &checkpoint);
+    rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_.get(), &checkpoint);
     if (!s.ok()) {
       LOG(WARNING) << "Failed to create checkpoint object. Error: " << s.ToString();
       return {Status::NotOK, s.ToString()};
diff --git a/src/storage/storage.h b/src/storage/storage.h
index c5491ce9..ae4dc577 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -178,10 +178,10 @@ class Storage {
   std::string GetReplIdFromDbEngine();
 
  private:
-  rocksdb::DB *db_ = nullptr;
+  std::unique_ptr<rocksdb::DB> db_ = nullptr;
   std::string replid_;
   time_t backup_creating_time_;
-  rocksdb::BackupEngine *backup_ = nullptr;
+  std::unique_ptr<rocksdb::BackupEngine> backup_ = nullptr;
   rocksdb::Env *env_;
   std::shared_ptr<rocksdb::SstFileManager> sst_file_manager_;
   std::shared_ptr<rocksdb::RateLimiter> rate_limiter_;
diff --git a/tests/cppunit/types/set_test.cc b/tests/cppunit/types/set_test.cc
index 65ac3fef..26dbb438 100644
--- a/tests/cppunit/types/set_test.cc
+++ b/tests/cppunit/types/set_test.cc
@@ -107,7 +107,7 @@ TEST_F(RedisSetTest, Move) {
   uint64_t ret = 0;
   bool flag = false;
   rocksdb::Status s = set_->Add(key_, fields_, &ret);
-  EXPECT_TRUE(s.ok() && static_cast<int>(fields_.size()) == ret);
+  EXPECT_TRUE(s.ok() && fields_.size() == ret);
   Slice dst("set-test-move-key");
   for (auto &field : fields_) {
     s = set_->Move(key_, dst, field, &flag);