You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by tw...@apache.org on 2023/06/07 09:22:55 UTC
[incubator-kvrocks] branch unstable updated: Avoid manually releasing DB pointer via unique_ptr (#1487)
This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new f557255c Avoid manually releasing DB pointer via unique_ptr (#1487)
f557255c is described below
commit f557255cec8d4f43cf63144b712c71026bc9320e
Author: Twice <tw...@gmail.com>
AuthorDate: Wed Jun 7 17:22:48 2023 +0800
Avoid manually releasing DB pointer via unique_ptr (#1487)
---
src/common/db_util.h | 63 +++++++++++++++++++++++++++++++++++++++++
src/storage/storage.cc | 54 ++++++++++++++++-------------------
src/storage/storage.h | 4 +--
tests/cppunit/types/set_test.cc | 2 +-
4 files changed, 90 insertions(+), 33 deletions(-)
diff --git a/src/common/db_util.h b/src/common/db_util.h
index f566449a..e394a9f6 100644
--- a/src/common/db_util.h
+++ b/src/common/db_util.h
@@ -24,6 +24,7 @@
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
+#include "rocksdb/utilities/backup_engine.h"
#include "storage/storage.h"
namespace util {
@@ -39,4 +40,66 @@ struct UniqueIterator : std::unique_ptr<rocksdb::Iterator> {
: BaseType(storage->NewIterator(options)) {}
};
+namespace details {
+
+template <typename T, auto* F, Status::Code C = Status::NotOK, typename... Args>
+StatusOr<std::unique_ptr<T>> WrapOutPtrToUnique(Args&&... args) {
+ T* ptr = nullptr;
+ auto s = (*F)(std::forward<Args>(args)..., &ptr);
+
+ if (!s.ok()) {
+ return {C, s.ToString()};
+ }
+
+ return ptr;
+}
+
+inline rocksdb::Status DBOpenForReadOnly(const rocksdb::DBOptions& db_options, const std::string& dbname,
+ const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+ std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
+ return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families, handles, dbptr);
+}
+
+} // namespace details
+
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(const rocksdb::Options& options, const std::string& dbname) {
+ return details::WrapOutPtrToUnique<
+ rocksdb::DB,
+ static_cast<rocksdb::Status (*)(const rocksdb::Options&, const std::string&, rocksdb::DB**)>(rocksdb::DB::Open),
+ Status::DBOpenErr>(options, dbname);
+}
+
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(
+ const rocksdb::DBOptions& db_options, const std::string& dbname,
+ const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+ std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
+ return details::WrapOutPtrToUnique<
+ rocksdb::DB,
+ static_cast<rocksdb::Status (*)(const rocksdb::DBOptions&, const std::string&,
+ const std::vector<rocksdb::ColumnFamilyDescriptor>&,
+ std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(rocksdb::DB::Open),
+ Status::DBOpenErr>(db_options, dbname, column_families, handles);
+}
+
+inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpenForReadOnly(
+ const rocksdb::DBOptions& db_options, const std::string& dbname,
+ const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
+ std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
+ return details::WrapOutPtrToUnique<
+ rocksdb::DB,
+ static_cast<rocksdb::Status (*)(
+ const rocksdb::DBOptions&, const std::string&, const std::vector<rocksdb::ColumnFamilyDescriptor>&,
+ std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(details::DBOpenForReadOnly),
+ Status::DBOpenErr>(db_options, dbname, column_families, handles);
+}
+
+inline StatusOr<std::unique_ptr<rocksdb::BackupEngine>> BackupEngineOpen(rocksdb::Env* db_env,
+ const rocksdb::BackupEngineOptions& options) {
+ return details::WrapOutPtrToUnique<
+ rocksdb::BackupEngine,
+ static_cast<rocksdb::IOStatus (*)(rocksdb::Env*, const rocksdb::BackupEngineOptions&, rocksdb::BackupEngine**)>(
+ rocksdb::BackupEngine::Open),
+ Status::DBBackupErr>(db_env, options);
+}
+
} // namespace util
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 9e58d949..65906b6f 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -37,6 +37,7 @@
#include <random>
#include "compact_filter.h"
+#include "db_util.h"
#include "event_listener.h"
#include "event_util.h"
#include "redis_db.h"
@@ -72,9 +73,8 @@ void Storage::CloseDB() {
db_closing_ = true;
db_->SyncWAL();
- rocksdb::CancelAllBackgroundWork(db_, true);
+ rocksdb::CancelAllBackgroundWork(db_.get(), true);
for (auto handle : cf_handles_) db_->DestroyColumnFamilyHandle(handle);
- delete db_;
db_ = nullptr;
}
@@ -200,36 +200,31 @@ Status Storage::SetDBOption(const std::string &key, const std::string &value) {
}
Status Storage::CreateColumnFamilies(const rocksdb::Options &options) {
- rocksdb::DB *tmp_db = nullptr;
rocksdb::ColumnFamilyOptions cf_options(options);
- rocksdb::Status s = rocksdb::DB::Open(options, config_->db_dir, &tmp_db);
- if (s.ok()) {
+ auto res = util::DBOpen(options, config_->db_dir);
+ if (res) {
std::vector<std::string> cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName,
kPropagateColumnFamilyName, kStreamColumnFamilyName};
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
- s = tmp_db->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
+ auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
if (!s.ok()) {
- delete tmp_db;
return {Status::DBOpenErr, s.ToString()};
}
- for (auto handle : cf_handles) tmp_db->DestroyColumnFamilyHandle(handle);
- tmp_db->Close();
- delete tmp_db;
- }
-
- if (!s.ok()) {
+ for (auto handle : cf_handles) (*res)->DestroyColumnFamilyHandle(handle);
+ (*res)->Close();
+ } else {
// We try to create column families by opening the database without column families.
// If it's ok means we didn't create column families (cannot open without column families if created).
// When goes wrong, we need to check whether it's caused by column families NOT being opened or not.
// If the status message contains `Column families not opened` means that we have created the column
// families, let's ignore the error.
- std::string not_opened_prefix = "Column families not opened";
- if (s.IsInvalidArgument() && s.ToString().find(not_opened_prefix) != std::string::npos) {
+ const char *not_opened_prefix = "Column families not opened";
+ if (res.Msg().find(not_opened_prefix) != std::string::npos) {
return Status::OK();
}
- return {Status::NotOK, s.ToString()};
+ return res;
}
return Status::OK();
@@ -314,11 +309,8 @@ Status Storage::Open(bool read_only) {
if (!s.ok()) return {Status::NotOK, s.ToString()};
auto start = std::chrono::high_resolution_clock::now();
- if (read_only) {
- s = rocksdb::DB::OpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_, &db_);
- } else {
- s = rocksdb::DB::Open(options, config_->db_dir, column_families, &cf_handles_, &db_);
- }
+ auto dbs = read_only ? util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_)
+ : util::DBOpen(options, config_->db_dir, column_families, &cf_handles_);
auto end = std::chrono::high_resolution_clock::now();
int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
if (!s.ok()) {
@@ -326,6 +318,7 @@ Status Storage::Open(bool read_only) {
return {Status::DBOpenErr, s.ToString()};
}
+ db_ = std::move(*dbs);
LOG(INFO) << "[storage] Success to load the data from disk: " << duration << " ms";
return Status::OK();
}
@@ -341,7 +334,7 @@ Status Storage::CreateBackup() {
// 1) Create checkpoint of rocksdb for backup
rocksdb::Checkpoint *checkpoint = nullptr;
- rocksdb::Status s = rocksdb::Checkpoint::Create(db_, &checkpoint);
+ rocksdb::Status s = rocksdb::Checkpoint::Create(db_.get(), &checkpoint);
if (!s.ok()) {
LOG(WARNING) << "Failed to create checkpoint object for backup. Error: " << s.ToString();
return {Status::NotOK, s.ToString()};
@@ -382,7 +375,6 @@ void Storage::DestroyBackup() {
return;
}
backup_->StopBackup();
- delete backup_;
backup_ = nullptr;
}
@@ -390,10 +382,11 @@ Status Storage::RestoreFromBackup() {
// TODO(@ruoshan): assert role to be slave
// We must reopen the backup engine every time, as the files is changed
rocksdb::BackupEngineOptions bk_option(config_->backup_sync_dir);
- auto s = rocksdb::BackupEngine::Open(db_->GetEnv(), bk_option, &backup_);
- if (!s.ok()) return {Status::DBBackupErr, s.ToString()};
+ auto bes = util::BackupEngineOpen(db_->GetEnv(), bk_option);
+ if (!bes) return bes;
+ backup_ = std::move(*bes);
- s = backup_->RestoreDBFromLatestBackup(config_->db_dir, config_->db_dir);
+ auto s = backup_->RestoreDBFromLatestBackup(config_->db_dir, config_->db_dir);
if (!s.ok()) {
LOG(ERROR) << "[storage] Failed to restore database from the latest backup. Error: " << s.ToString();
} else {
@@ -520,7 +513,7 @@ rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb:
rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, std::string *value) {
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
- return txn_write_batch_->GetFromBatchAndDB(db_, options, column_family, key, value);
+ return txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
}
return db_->Get(options, column_family, key, value);
}
@@ -542,7 +535,8 @@ void Storage::MultiGet(const rocksdb::ReadOptions &options, rocksdb::ColumnFamil
const size_t num_keys, const rocksdb::Slice *keys, rocksdb::PinnableSlice *values,
rocksdb::Status *statuses) {
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
- txn_write_batch_->MultiGetFromBatchAndDB(db_, options, column_family, num_keys, keys, values, statuses, false);
+ txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
+ false);
} else {
db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
}
@@ -701,7 +695,7 @@ void Storage::SetIORateLimit(int64_t max_io_mb) {
rate_limiter_->SetBytesPerSecond(max_io_mb * static_cast<int64_t>(MiB));
}
-rocksdb::DB *Storage::GetDB() { return db_; }
+rocksdb::DB *Storage::GetDB() { return db_.get(); }
Status Storage::BeginTxn() {
if (is_txn_mode_) {
@@ -834,7 +828,7 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
// Create checkpoint if not exist
if (!storage->env_->FileExists(data_files_dir).ok()) {
rocksdb::Checkpoint *checkpoint = nullptr;
- rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_, &checkpoint);
+ rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_.get(), &checkpoint);
if (!s.ok()) {
LOG(WARNING) << "Failed to create checkpoint object. Error: " << s.ToString();
return {Status::NotOK, s.ToString()};
diff --git a/src/storage/storage.h b/src/storage/storage.h
index c5491ce9..ae4dc577 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -178,10 +178,10 @@ class Storage {
std::string GetReplIdFromDbEngine();
private:
- rocksdb::DB *db_ = nullptr;
+ std::unique_ptr<rocksdb::DB> db_ = nullptr;
std::string replid_;
time_t backup_creating_time_;
- rocksdb::BackupEngine *backup_ = nullptr;
+ std::unique_ptr<rocksdb::BackupEngine> backup_ = nullptr;
rocksdb::Env *env_;
std::shared_ptr<rocksdb::SstFileManager> sst_file_manager_;
std::shared_ptr<rocksdb::RateLimiter> rate_limiter_;
diff --git a/tests/cppunit/types/set_test.cc b/tests/cppunit/types/set_test.cc
index 65ac3fef..26dbb438 100644
--- a/tests/cppunit/types/set_test.cc
+++ b/tests/cppunit/types/set_test.cc
@@ -107,7 +107,7 @@ TEST_F(RedisSetTest, Move) {
uint64_t ret = 0;
bool flag = false;
rocksdb::Status s = set_->Add(key_, fields_, &ret);
- EXPECT_TRUE(s.ok() && static_cast<int>(fields_.size()) == ret);
+ EXPECT_TRUE(s.ok() && fields_.size() == ret);
Slice dst("set-test-move-key");
for (auto &field : fields_) {
s = set_->Move(key_, dst, field, &flag);