You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2023/01/28 04:18:08 UTC

[incubator-kvrocks] branch unstable updated: Allow to enable the async_io option to improve the performance (#1215)

This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new c41bec89 Allow to enable the async_io option to improve the performance (#1215)
c41bec89 is described below

commit c41bec89045c0c0c800296c6b1bcec48bf2a4461
Author: xiaobiaozhao <52...@users.noreply.github.com>
AuthorDate: Sat Jan 28 12:18:03 2023 +0800

    Allow to enable the async_io option to improve the performance (#1215)
    
    Currently, RocksDB supports the async io to mitigate the impact of storage latency
    to read asynchronously and in parallel as much as possible to hide IO latency. And it
    only takes effect in MultiGet and Iterators:
    
    In Iterators, it will prefetch data asynchronously in the background for each file being iterated on.
    In MultiGet, it will read the necessary data blocks from those files in parallel as much as possible.
    
    For the current implementation, we only allow enabling the async io in Iterator, because the MultiGet
    operation depends on the folly library and C++ 20 which it's NOT expected to introduce in Kvrocks.
    ---------
    
    Co-authored-by: hulk <hu...@gmail.com>
    Co-authored-by: Twice <tw...@gmail.com>
---
 kvrocks.conf                  | 10 +++++++++-
 src/cluster/slot_migrate.cc   |  6 +++---
 src/config/config.cc          |  3 +++
 src/config/config.h           |  4 ++++
 src/storage/redis_db.cc       | 12 ++++++------
 src/storage/storage.cc        |  5 +++++
 src/storage/storage.h         |  1 +
 src/types/redis_bitmap.cc     |  2 +-
 src/types/redis_hash.cc       |  4 ++--
 src/types/redis_list.cc       |  6 +++---
 src/types/redis_set.cc        |  4 ++--
 src/types/redis_sortedint.cc  |  4 ++--
 src/types/redis_stream.cc     |  6 +++---
 src/types/redis_zset.cc       | 10 +++++-----
 utils/kvrocks2redis/parser.cc |  2 +-
 15 files changed, 50 insertions(+), 29 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index 7932477f..5119d49e 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -107,7 +107,7 @@ log-level info
 #
 # if set to -1, that means to disable the log cleaner.
 # if set to 0, all previous INFO level logs will be immediately removed.
-# if set to between 0 to INT_MAX, that means it will retent latest N(log-retention-days) day logs. 
+# if set to between 0 to INT_MAX, that means it will retent latest N(log-retention-days) day logs.
 
 # By default the log-retention-days is -1.
 log-retention-days -1
@@ -726,6 +726,14 @@ rocksdb.max_bytes_for_level_base 268435456
 # Default: 10
 rocksdb.max_bytes_for_level_multiplier 10
 
+# This feature only takes effect in Iterators and MultiGet.
+# If yes, RocksDB will try to read asynchronously and in parallel as much as possible to hide IO latency.
+# In iterators, it will prefetch data asynchronously in the background for each file being iterated on. 
+# In MultiGet, it will read the necessary data blocks from those files in parallel as much as possible.
+
+# Default no
+rocksdb.read_options.async_io no
+
 # If yes, the write will be flushed from the operating system
 # buffer cache before the write is considered complete.
 # If this flag is enabled, writes will be slower.
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 32549e82..e1d3a508 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -290,7 +290,7 @@ Status SlotMigrate::SendSnapshot() {
 
   rocksdb::ReadOptions read_options;
   read_options.snapshot = slot_snapshot_;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   rocksdb::ColumnFamilyHandle *cf_handle = storage_->GetCFHandle(Engine::kMetadataColumnFamilyName);
   std::unique_ptr<rocksdb::Iterator> iter(storage_->GetDB()->NewIterator(read_options, cf_handle));
 
@@ -643,7 +643,7 @@ Status SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata
   std::vector<std::string> user_cmd = {cmd, key.ToString()};
   rocksdb::ReadOptions read_options;
   read_options.snapshot = slot_snapshot_;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   std::unique_ptr<rocksdb::Iterator> iter(storage_->GetDB()->NewIterator(read_options));
 
   // Construct key prefix to iterate values of the complex type user key
@@ -744,7 +744,7 @@ Status SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata
 Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metadata, std::string *restore_cmds) {
   rocksdb::ReadOptions read_options;
   read_options.snapshot = slot_snapshot_;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   std::unique_ptr<rocksdb::Iterator> iter(
       storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(Engine::kStreamColumnFamilyName)));
 
diff --git a/src/config/config.cc b/src/config/config.cc
index 066a6c08..61d78eaf 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -212,6 +212,9 @@ Config::Config() {
       {"rocksdb.write_options.low_pri", true, new YesNoField(&RocksDB.write_options.low_pri, false)},
       {"rocksdb.write_options.memtable_insert_hint_per_batch", true,
        new YesNoField(&RocksDB.write_options.memtable_insert_hint_per_batch, false)},
+
+      /* rocksdb read options */
+      {"rocksdb.read_options.async_io", false, new YesNoField(&RocksDB.read_options.async_io, false)},
   };
   for (auto &wrapper : fields) {
     auto &field = wrapper.field;
diff --git a/src/config/config.h b/src/config/config.h
index e3534220..9df5e703 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -189,6 +189,10 @@ struct Config {
       bool low_pri;
       bool memtable_insert_hint_per_batch;
     } write_options;
+
+    struct ReadOptions {
+      bool async_io;
+    } read_options;
   } RocksDB;
 
   mutable std::mutex backup_mu_;
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index b80614e0..c700599d 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -181,7 +181,7 @@ void Database::Keys(const std::string &prefix, std::vector<std::string> *keys, K
   LatestSnapShot ss(db_);
   rocksdb::ReadOptions read_options;
   read_options.snapshot = ss.GetSnapShot();
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   auto iter = DBUtil::UniqueIterator(db_, read_options, metadata_cf_handle_);
 
   while (true) {
@@ -235,7 +235,7 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
   LatestSnapShot ss(db_);
   rocksdb::ReadOptions read_options;
   read_options.snapshot = ss.GetSnapShot();
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   auto iter = DBUtil::UniqueIterator(db_, read_options, metadata_cf_handle_);
 
   AppendNamespacePrefix(cursor, &ns_cursor);
@@ -359,7 +359,7 @@ rocksdb::Status Database::FlushAll() {
   LatestSnapShot ss(db_);
   rocksdb::ReadOptions read_options;
   read_options.snapshot = ss.GetSnapShot();
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   auto iter = DBUtil::UniqueIterator(db_, read_options, metadata_cf_handle_);
   iter->SeekToFirst();
   if (!iter->Valid()) {
@@ -465,7 +465,7 @@ rocksdb::Status Database::FindKeyRangeWithPrefix(const std::string &prefix, cons
   LatestSnapShot ss(storage_->GetDB());
   rocksdb::ReadOptions read_options;
   read_options.snapshot = ss.GetSnapShot();
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   auto iter = DBUtil::UniqueIterator(storage_->GetDB(), read_options, cf_handle);
   iter->Seek(prefix);
   if (!iter->Valid() || !iter->key().starts_with(prefix)) {
@@ -520,7 +520,7 @@ rocksdb::Status Database::GetSlotKeysInfo(int slot, std::map<int, uint64_t> *slo
   snapshot = storage_->GetDB()->GetSnapshot();
   rocksdb::ReadOptions read_options;
   read_options.snapshot = snapshot;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   auto iter = db_->NewIterator(read_options, metadata_cf_handle_);
   bool end = false;
   for (int i = 0; i < HASH_SLOTS_SIZE; i++) {
@@ -567,7 +567,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
   LatestSnapShot ss(db_);
   rocksdb::ReadOptions read_options;
   read_options.snapshot = ss.GetSnapShot();
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   std::string match_prefix_key;
   if (!subkey_prefix.empty()) {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index ef020bfd..7ef62391 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -103,6 +103,11 @@ void Storage::SetWriteOptions(const Config::RocksDB::WriteOptions &config) {
   write_opts_.memtable_insert_hint_per_batch = config.memtable_insert_hint_per_batch;
 }
 
+void Storage::SetReadOptions(rocksdb::ReadOptions &read_options) {
+  read_options.fill_cache = false;
+  read_options.async_io = config_->RocksDB.read_options.async_io;
+}
+
 rocksdb::BlockBasedTableOptions Storage::InitTableOptions() {
   rocksdb::BlockBasedTableOptions table_options;
   table_options.format_version = 5;
diff --git a/src/storage/storage.h b/src/storage/storage.h
index ee28e6b6..7342b193 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -68,6 +68,7 @@ class Storage {
   ~Storage();
 
   void SetWriteOptions(const Config::RocksDB::WriteOptions &config);
+  void SetReadOptions(rocksdb::ReadOptions &read_options);
   Status Open(bool read_only = false);
   void CloseDB();
   void EmptyDB();
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index c81d5414..1b329b98 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -121,7 +121,7 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key, const uint32_t max_btos
   rocksdb::ReadOptions read_options;
   LatestSnapShot ss(db_);
   read_options.snapshot = ss.GetSnapShot();
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
   uint32_t frag_index = 0, valid_size = 0;
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc
index f2ea1c27..539740c6 100644
--- a/src/types/redis_hash.cc
+++ b/src/types/redis_hash.cc
@@ -296,7 +296,7 @@ rocksdb::Status Hash::RangeByLex(const Slice &user_key, const CommonRangeLexSpec
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   if (!spec.reversed) {
@@ -353,7 +353,7 @@ rocksdb::Status Hash::GetAll(const Slice &user_key, std::vector<FieldValue> *fie
   read_options.snapshot = ss.GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix_key);
   read_options.iterate_upper_bound = &upper_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) {
diff --git a/src/types/redis_list.cc b/src/types/redis_list.cc
index d049bfda..7c5c50db 100644
--- a/src/types/redis_list.cc
+++ b/src/types/redis_list.cc
@@ -193,7 +193,7 @@ rocksdb::Status List::Rem(const Slice &user_key, int count, const Slice &elem, i
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix);
@@ -282,7 +282,7 @@ rocksdb::Status List::Insert(const Slice &user_key, const Slice &pivot, const Sl
   read_options.snapshot = ss.GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix);
   read_options.iterate_upper_bound = &upper_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
@@ -391,7 +391,7 @@ rocksdb::Status List::Range(const Slice &user_key, int start, int stop, std::vec
   read_options.snapshot = ss.GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix);
   read_options.iterate_upper_bound = &upper_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   for (iter->Seek(start_key); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
diff --git a/src/types/redis_set.cc b/src/types/redis_set.cc
index ffcb182c..8b492863 100644
--- a/src/types/redis_set.cc
+++ b/src/types/redis_set.cc
@@ -152,7 +152,7 @@ rocksdb::Status Set::Members(const Slice &user_key, std::vector<std::string> *me
   read_options.snapshot = ss.GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix);
   read_options.iterate_upper_bound = &upper_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
@@ -225,7 +225,7 @@ rocksdb::Status Set::Take(const Slice &user_key, std::vector<std::string> *membe
   read_options.snapshot = ss.GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix);
   read_options.iterate_upper_bound = &upper_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
diff --git a/src/types/redis_sortedint.cc b/src/types/redis_sortedint.cc
index a79a1ac6..0641c836 100644
--- a/src/types/redis_sortedint.cc
+++ b/src/types/redis_sortedint.cc
@@ -139,7 +139,7 @@ rocksdb::Status Sortedint::Range(const Slice &user_key, uint64_t cursor_id, uint
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   uint64_t id = 0, pos = 0;
   auto iter = DBUtil::UniqueIterator(db_, read_options);
@@ -180,7 +180,7 @@ rocksdb::Status Sortedint::RangeByValue(const Slice &user_key, SortedintRangeSpe
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   int pos = 0;
   auto iter = DBUtil::UniqueIterator(db_, read_options);
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index b8911364..1a020093 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -249,7 +249,7 @@ rocksdb::Status Stream::DeleteEntries(const rocksdb::Slice &stream_name, const s
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options, stream_cf_handle_);
 
@@ -364,7 +364,7 @@ rocksdb::Status Stream::range(const std::string &ns_key, const StreamMetadata &m
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options, stream_cf_handle_);
   iter->Seek(start_key);
@@ -567,7 +567,7 @@ uint64_t Stream::trim(const std::string &ns_key, const StreamTrimOptions &option
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options, stream_cf_handle_);
   std::string start_key = internalKeyFromEntryID(ns_key, *metadata, metadata->first_entry_id);
diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc
index bc2b7ef5..756118f9 100644
--- a/src/types/redis_zset.cc
+++ b/src/types/redis_zset.cc
@@ -193,7 +193,7 @@ rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, std::vecto
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options, score_cf_handle_);
   iter->Seek(start_key);
@@ -261,7 +261,7 @@ rocksdb::Status ZSet::Range(const Slice &user_key, int start, int stop, uint8_t
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   rocksdb::WriteBatch batch;
   auto iter = DBUtil::UniqueIterator(db_, read_options, score_cf_handle_);
@@ -363,7 +363,7 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key, ZRangeSpec spec, std::
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   int pos = 0;
   auto iter = DBUtil::UniqueIterator(db_, read_options, score_cf_handle_);
@@ -446,7 +446,7 @@ rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const CommonRangeLexSpec
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   int pos = 0;
   auto iter = DBUtil::UniqueIterator(db_, read_options);
@@ -607,7 +607,7 @@ rocksdb::Status ZSet::Rank(const Slice &user_key, const Slice &member, bool reve
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   auto iter = DBUtil::UniqueIterator(db_, read_options, score_cf_handle_);
   iter->Seek(start_key);
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index 1414183b..5c025101 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -94,7 +94,7 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
   read_options.snapshot = latest_snapshot_->GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix_key);
   read_options.iterate_upper_bound = &upper_bound;
-  read_options.fill_cache = false;
+  storage_->SetReadOptions(read_options);
 
   std::string output;
   auto iter = DBUtil::UniqueIterator(db_, read_options);