You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2022/09/20 16:48:13 UTC
[incubator-kvrocks] branch unstable updated: Export the rocksdb write options (#885)
This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 01c2d87 Export the rocksdb write options (#885)
01c2d87 is described below
commit 01c2d87ce0d8e8bcd4cd2f0ff5ae9150040dad8d
Author: Andrei Vydrin <tu...@gmail.com>
AuthorDate: Tue Sep 20 23:48:05 2022 +0700
Export the rocksdb write options (#885)
---
kvrocks.conf | 39 +++++++++++++++++++++++++++++++++++++++
src/config.cc | 8 ++++++++
src/config.h | 10 +++++++++-
src/redis_bitmap.cc | 6 +++---
src/redis_bitmap_string.cc | 2 +-
src/redis_cmd.cc | 3 +++
src/redis_db.cc | 4 ++--
src/redis_hash.cc | 8 ++++----
src/redis_list.cc | 18 +++++++++---------
src/redis_pubsub.cc | 2 +-
src/redis_set.cc | 8 ++++----
src/redis_sortedint.cc | 4 ++--
src/redis_stream.cc | 6 +++---
src/redis_string.cc | 10 +++++-----
src/redis_zset.cc | 14 +++++++-------
src/server.cc | 2 +-
src/storage.cc | 17 +++++++++++++----
src/storage.h | 4 ++++
18 files changed, 118 insertions(+), 47 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index 3847f07..2ef247b 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -711,5 +711,44 @@ rocksdb.max_bytes_for_level_base 268435456
# Default: 10
rocksdb.max_bytes_for_level_multiplier 10
+# If yes, the write will be flushed from the operating system
+# buffer cache before the write is considered complete.
+# If this flag is enabled, writes will be slower.
+# If this flag is disabled, and the machine crashes, some recent
+# rites may be lost. Note that if it is just the process that
+# crashes (i.e., the machine does not reboot), no writes will be
+# lost even if sync==false.
+#
+# Default: no
+rocksdb.write_options.sync no
+
+# If yes, writes will not first go to the write ahead log,
+# and the write may get lost after a crash.
+#
+# Deafult: no
+rocksdb.write_options.disable_wal no
+
+# If enabled and we need to wait or sleep for the write request, fails
+# immediately.
+#
+# Default: no
+rocksdb.write_options.no_slowdown no
+
+# If enabled, write requests are of lower priority if compaction is
+# behind. In this case, no_slowdown = true, the request will be canceled
+# immediately. Otherwise, it will be slowed down.
+# The slowdown value is determined by RocksDB to guarantee
+# it introduces minimum impacts to high priority writes.
+#
+# Default: no
+rocksdb.write_options.low_pri no
+
+# If enabled, this writebatch will maintain the last insert positions of each
+# memtable as hints in concurrent write. It can improve write performance
+# in concurrent writes if keys in one writebatch are sequential.
+#
+# Default: no
+rocksdb.write_options.memtable_insert_hint_per_batch no
+
################################ NAMESPACE #####################################
# namespace.test change.me
diff --git a/src/config.cc b/src/config.cc
index a3cc93a..16c0972 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -203,6 +203,14 @@ Config::Config() {
false, new IntField(&RocksDB.max_bytes_for_level_multiplier, 10, 1, 100)},
{"rocksdb.level_compaction_dynamic_level_bytes",
false, new YesNoField(&RocksDB.level_compaction_dynamic_level_bytes, false)},
+
+ /* rocksdb write options */
+ {"rocksdb.write_options.sync", true, new YesNoField(&RocksDB.write_options.sync, false)},
+ {"rocksdb.write_options.disable_wal", true, new YesNoField(&RocksDB.write_options.disable_WAL, false)},
+ {"rocksdb.write_options.no_slowdown", true, new YesNoField(&RocksDB.write_options.no_slowdown, false)},
+ {"rocksdb.write_options.low_pri", true, new YesNoField(&RocksDB.write_options.low_pri, false)},
+ {"rocksdb.write_options.memtable_insert_hint_per_batch",
+ true, new YesNoField(&RocksDB.write_options.memtable_insert_hint_per_batch, false)},
};
for (auto &wrapper : fields) {
auto &field = wrapper.field;
diff --git a/src/config.h b/src/config.h
index 2eba3f4..dec091a 100644
--- a/src/config.h
+++ b/src/config.h
@@ -141,7 +141,7 @@ struct Config{
std::set<std::string> profiling_sample_commands;
bool profiling_sample_all_commands = false;
- struct {
+ struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
int metadata_block_cache_size;
@@ -175,6 +175,14 @@ struct Config{
int max_bytes_for_level_base;
int max_bytes_for_level_multiplier;
bool level_compaction_dynamic_level_bytes;
+
+ struct WriteOptions {
+ bool sync;
+ bool disable_WAL;
+ bool no_slowdown;
+ bool low_pri;
+ bool memtable_insert_hint_per_batch;
+ } write_options;
} RocksDB;
public:
diff --git a/src/redis_bitmap.cc b/src/redis_bitmap.cc
index 4fbcfe4..02b316b 100644
--- a/src/redis_bitmap.cc
+++ b/src/redis_bitmap.cc
@@ -232,7 +232,7 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool new_
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Bitmap::BitCount(const Slice &user_key, int64_t start, int64_t stop, uint32_t *cnt) {
@@ -384,7 +384,7 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const std::string &op_name,
rocksdb::WriteBatch batch;
if (max_size == 0) {
batch.Delete(metadata_cf_handle_, ns_key);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
std::vector<std::string> log_args = {std::to_string(kRedisCmdBitOp), op_name};
for (const auto &op_key : op_keys) {
@@ -535,7 +535,7 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const std::string &op_name,
res_metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
*len = max_size;
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
bool Bitmap::GetBitFromValueAndOffset(const std::string &value, uint32_t offset) {
diff --git a/src/redis_bitmap_string.cc b/src/redis_bitmap_string.cc
index 8badca1..acb2f73 100644
--- a/src/redis_bitmap_string.cc
+++ b/src/redis_bitmap_string.cc
@@ -64,7 +64,7 @@ rocksdb::Status BitmapString::SetBit(const Slice &ns_key,
WriteBatchLogData log_data(kRedisString);
batch.PutLogData(log_data.Encode());
batch.Put(metadata_cf_handle_, ns_key, *raw_value);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status BitmapString::BitCount(const std::string &raw_value, int64_t start, int64_t stop, uint32_t *cnt) {
diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc
index 89fb81f..9ece7bf 100644
--- a/src/redis_cmd.cc
+++ b/src/redis_cmd.cc
@@ -3731,6 +3731,9 @@ class CommandSlaveOf : public Commander {
if (svr->GetConfig()->cluster_enabled) {
return Status(Status::RedisExecErr, "can't change to slave in cluster mode");
}
+ if (svr->GetConfig()->RocksDB.write_options.disable_WAL) {
+ return Status(Status::RedisExecErr, "slaveof doesn't work with disable_wal option");
+ }
if (!conn->IsAdmin()) {
*output = Redis::Error(errAdministorPermissionRequired);
return Status::OK();
diff --git a/src/redis_db.cc b/src/redis_db.cc
index 8f86c9a..b748146 100644
--- a/src/redis_db.cc
+++ b/src/redis_db.cc
@@ -99,7 +99,7 @@ rocksdb::Status Database::Expire(const Slice &user_key, int timestamp) {
WriteBatchLogData log_data(kRedisNone, {std::to_string(kRedisCmdExpire)});
batch.PutLogData(log_data.Encode());
batch.Put(metadata_cf_handle_, ns_key, Slice(buf, value.size()));
- s = storage_->Write(rocksdb::WriteOptions(), &batch);
+ s = storage_->Write(storage_->DefaultWriteOptions(), &batch);
delete[]buf;
return s;
}
@@ -117,7 +117,7 @@ rocksdb::Status Database::Del(const Slice &user_key) {
if (metadata.Expired()) {
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
}
- return storage_->Delete(rocksdb::WriteOptions(), metadata_cf_handle_, ns_key);
+ return storage_->Delete(storage_->DefaultWriteOptions(), metadata_cf_handle_, ns_key);
}
rocksdb::Status Database::Exists(const std::vector<Slice> &keys, int *ret) {
diff --git a/src/redis_hash.cc b/src/redis_hash.cc
index 078aa2a..0fbf407 100644
--- a/src/redis_hash.cc
+++ b/src/redis_hash.cc
@@ -105,7 +105,7 @@ rocksdb::Status Hash::IncrBy(const Slice &user_key, const Slice &field, int64_t
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Hash::IncrByFloat(const Slice &user_key, const Slice &field, double increment, double *ret) {
@@ -155,7 +155,7 @@ rocksdb::Status Hash::IncrByFloat(const Slice &user_key, const Slice &field, dou
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Hash::MGet(const Slice &user_key,
@@ -231,7 +231,7 @@ rocksdb::Status Hash::Delete(const Slice &user_key, const std::vector<Slice> &fi
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector<FieldValue> &field_values, bool nx, int *ret) {
@@ -272,7 +272,7 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector<FieldValue>
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Hash::Range(const Slice &user_key, const Slice &start, const Slice &stop,
diff --git a/src/redis_list.cc b/src/redis_list.cc
index 16fe779..a354f37 100644
--- a/src/redis_list.cc
+++ b/src/redis_list.cc
@@ -88,7 +88,7 @@ rocksdb::Status List::push(const Slice &user_key,
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
*ret = metadata.size;
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status List::Pop(const Slice &user_key, bool left, std::string *elem) {
@@ -147,7 +147,7 @@ rocksdb::Status List::PopMulti(const rocksdb::Slice &user_key, bool left, uint32
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
/*
@@ -262,7 +262,7 @@ rocksdb::Status List::Rem(const Slice &user_key, int count, const Slice &elem, i
}
*ret = static_cast<int>(to_delete_indexes.size());
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status List::Insert(const Slice &user_key, const Slice &pivot, const Slice &elem, bool before, int *ret) {
@@ -347,7 +347,7 @@ rocksdb::Status List::Insert(const Slice &user_key, const Slice &pivot, const Sl
batch.Put(metadata_cf_handle_, ns_key, bytes);
*ret = metadata.size;
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status List::Index(const Slice &user_key, int index, std::string *elem) {
@@ -447,7 +447,7 @@ rocksdb::Status List::Set(const Slice &user_key, int index, Slice elem) {
log_data(kRedisList, {std::to_string(kRedisCmdLSet), std::to_string(index)});
batch.PutLogData(log_data.Encode());
batch.Put(sub_key, elem);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status List::RPopLPush(const Slice &src, const Slice &dst, std::string *elem) {
@@ -534,7 +534,7 @@ rocksdb::Status List::lmoveOnSingleList(const rocksdb::Slice &src, bool src_left
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status List::lmoveOnTwoLists(const rocksdb::Slice &src, const rocksdb::Slice &dst,
@@ -598,7 +598,7 @@ rocksdb::Status List::lmoveOnTwoLists(const rocksdb::Slice &src, const rocksdb::
dst_metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, dst_ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
// Caution: trim the big list may block the server
@@ -617,7 +617,7 @@ rocksdb::Status List::Trim(const Slice &user_key, int start, int stop) {
// the result will be empty list when start > stop,
// or start is larger than the end of list
if (start > stop) {
- return storage_->Delete(rocksdb::WriteOptions(), metadata_cf_handle_, ns_key);
+ return storage_->Delete(storage_->DefaultWriteOptions(), metadata_cf_handle_, ns_key);
}
if (start < 0) start = 0;
@@ -655,6 +655,6 @@ rocksdb::Status List::Trim(const Slice &user_key, int start, int stop) {
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
} // namespace Redis
diff --git a/src/redis_pubsub.cc b/src/redis_pubsub.cc
index 5be6ff8..78c0297 100644
--- a/src/redis_pubsub.cc
+++ b/src/redis_pubsub.cc
@@ -24,6 +24,6 @@ namespace Redis {
rocksdb::Status PubSub::Publish(const Slice &channel, const Slice &value) {
rocksdb::WriteBatch batch;
batch.Put(pubsub_cf_handle_, channel, value);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
} // namespace Redis
diff --git a/src/redis_set.cc b/src/redis_set.cc
index 8402149..b789ffa 100644
--- a/src/redis_set.cc
+++ b/src/redis_set.cc
@@ -51,7 +51,7 @@ rocksdb::Status Set::Overwrite(Slice user_key, const std::vector<std::string> &m
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Set::Add(const Slice &user_key, const std::vector<Slice> &members, int *ret) {
@@ -83,7 +83,7 @@ rocksdb::Status Set::Add(const Slice &user_key, const std::vector<Slice> &member
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Set::Remove(const Slice &user_key, const std::vector<Slice> &members, int *ret) {
@@ -118,7 +118,7 @@ rocksdb::Status Set::Remove(const Slice &user_key, const std::vector<Slice> &mem
batch.Delete(metadata_cf_handle_, ns_key);
}
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Set::Card(const Slice &user_key, int *ret) {
@@ -244,7 +244,7 @@ rocksdb::Status Set::Take(const Slice &user_key, std::vector<std::string> *membe
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Set::Move(const Slice &src, const Slice &dst, const Slice &member, int *ret) {
diff --git a/src/redis_sortedint.cc b/src/redis_sortedint.cc
index e1b3703..97e5fec 100644
--- a/src/redis_sortedint.cc
+++ b/src/redis_sortedint.cc
@@ -63,7 +63,7 @@ rocksdb::Status Sortedint::Add(const Slice &user_key, std::vector<uint64_t> ids,
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Sortedint::Remove(const Slice &user_key, std::vector<uint64_t> ids, int *ret) {
@@ -95,7 +95,7 @@ rocksdb::Status Sortedint::Remove(const Slice &user_key, std::vector<uint64_t> i
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Sortedint::Card(const Slice &user_key, int *ret) {
diff --git a/src/redis_stream.cc b/src/redis_stream.cc
index 72c4ea9..184e8f6 100644
--- a/src/redis_stream.cc
+++ b/src/redis_stream.cc
@@ -155,7 +155,7 @@ rocksdb::Status Stream::Add(const Slice &stream_name, const StreamAddOptions& op
*id = next_entry_id;
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Stream::getNextEntryID(const StreamMetadata &metadata, const StreamAddOptions &options,
@@ -296,7 +296,7 @@ rocksdb::Status Stream::DeleteEntries(const rocksdb::Slice &stream_name,
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status Stream::Len(const rocksdb::Slice &stream_name, uint64_t *ret) {
@@ -525,7 +525,7 @@ rocksdb::Status Stream::Trim(const rocksdb::Slice &stream_name, const StreamTrim
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
return rocksdb::Status::OK();
diff --git a/src/redis_string.cc b/src/redis_string.cc
index 20ea14c..67e5e0d 100644
--- a/src/redis_string.cc
+++ b/src/redis_string.cc
@@ -103,7 +103,7 @@ rocksdb::Status String::updateRawValue(const std::string &ns_key, const std::str
WriteBatchLogData log_data(kRedisString);
batch.PutLogData(log_data.Encode());
batch.Put(metadata_cf_handle_, ns_key, raw_value);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status String::Append(const std::string &user_key, const std::string &value, int *ret) {
@@ -173,7 +173,7 @@ rocksdb::Status String::GetDel(const std::string &user_key, std::string *value)
rocksdb::Status s = getValue(ns_key, value);
if (!s.ok()) return s;
- return storage_->Delete(rocksdb::WriteOptions(), metadata_cf_handle_, ns_key);
+ return storage_->Delete(storage_->DefaultWriteOptions(), metadata_cf_handle_, ns_key);
}
rocksdb::Status String::Set(const std::string &user_key, const std::string &value) {
@@ -357,7 +357,7 @@ rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, int ttl) {
AppendNamespacePrefix(pair.key, &ns_key);
batch.Put(metadata_cf_handle_, ns_key, bytes);
LockGuard guard(storage_->GetLockManager(), ns_key);
- auto s = storage_->Write(rocksdb::WriteOptions(), &batch);
+ auto s = storage_->Write(storage_->DefaultWriteOptions(), &batch);
if (!s.ok()) return s;
}
return rocksdb::Status::OK();
@@ -398,7 +398,7 @@ rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, int ttl, in
WriteBatchLogData log_data(kRedisString);
batch.PutLogData(log_data.Encode());
batch.Put(metadata_cf_handle_, ns_key, bytes);
- auto s = storage_->Write(rocksdb::WriteOptions(), &batch);
+ auto s = storage_->Write(storage_->DefaultWriteOptions(), &batch);
if (!s.ok()) return s;
}
*ret = 1;
@@ -472,7 +472,7 @@ rocksdb::Status String::CAD(const std::string &user_key, const std::string &valu
}
if (value == current_value) {
- auto delete_status = storage_->Delete(rocksdb::WriteOptions(),
+ auto delete_status = storage_->Delete(storage_->DefaultWriteOptions(),
storage_->GetCFHandle(Engine::kMetadataColumnFamilyName),
ns_key);
if (!delete_status.ok()) {
diff --git a/src/redis_zset.cc b/src/redis_zset.cc
index f29e4dc..a1fa0e3 100644
--- a/src/redis_zset.cc
+++ b/src/redis_zset.cc
@@ -113,7 +113,7 @@ rocksdb::Status ZSet::Add(const Slice &user_key, uint8_t flags, std::vector<Memb
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status ZSet::Card(const Slice &user_key, int *ret) {
@@ -204,7 +204,7 @@ rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, std::vecto
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status ZSet::Range(const Slice &user_key, int start, int stop, uint8_t flags, std::vector<MemberScore>
@@ -280,7 +280,7 @@ rocksdb::Status ZSet::Range(const Slice &user_key, int start, int stop, uint8_t
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
return rocksdb::Status::OK();
}
@@ -400,7 +400,7 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key,
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
return rocksdb::Status::OK();
}
@@ -494,7 +494,7 @@ rocksdb::Status ZSet::RangeByLex(const Slice &user_key,
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
return rocksdb::Status::OK();
}
@@ -552,7 +552,7 @@ rocksdb::Status ZSet::Remove(const Slice &user_key, const std::vector<Slice> &me
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status ZSet::RemoveRangeByScore(const Slice &user_key, ZRangeSpec spec, int *ret) {
@@ -648,7 +648,7 @@ rocksdb::Status ZSet::Overwrite(const Slice &user_key, const std::vector<MemberS
std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
- return storage_->Write(rocksdb::WriteOptions(), &batch);
+ return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}
rocksdb::Status ZSet::InterStore(const Slice &dst,
diff --git a/src/server.cc b/src/server.cc
index 7f4ba93..6818d6d 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -1444,7 +1444,7 @@ void Server::ScriptReset() {
void Server::ScriptFlush() {
auto cf = storage_->GetCFHandle(Engine::kPropagateColumnFamilyName);
- storage_->FlushScripts(rocksdb::WriteOptions(), cf);
+ storage_->FlushScripts(storage_->DefaultWriteOptions(), cf);
ScriptReset();
}
diff --git a/src/storage.cc b/src/storage.cc
index 6ae9323..73da1e0 100644
--- a/src/storage.cc
+++ b/src/storage.cc
@@ -73,6 +73,7 @@ Storage::Storage(Config *config)
SetCheckpointCreateTime(0);
SetCheckpointAccessTime(0);
backup_creating_time_ = std::time(nullptr);
+ SetWriteOptions(config->RocksDB.write_options);
}
Storage::~Storage() {
@@ -94,6 +95,14 @@ void Storage::CloseDB() {
db_ = nullptr;
}
+void Storage::SetWriteOptions(const Config::RocksDB::WriteOptions& config) {
+ write_opts_.sync = config.sync;
+ write_opts_.disableWAL = config.disable_WAL;
+ write_opts_.no_slowdown = config.no_slowdown;
+ write_opts_.low_pri = config.low_pri;
+ write_opts_.memtable_insert_hint_per_batch = config.memtable_insert_hint_per_batch;
+}
+
rocksdb::BlockBasedTableOptions Storage::InitTableOptions() {
rocksdb::BlockBasedTableOptions table_options;
table_options.format_version = 5;
@@ -533,7 +542,7 @@ rocksdb::Status Storage::DeleteRange(const std::string &first_key, const std::st
if (!s.ok()) {
return s;
}
- return Write(rocksdb::WriteOptions(), &batch);
+ return Write(write_opts_, &batch);
}
rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle) {
@@ -547,7 +556,7 @@ rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rock
if (!s.ok()) {
return s;
}
- return Write(rocksdb::WriteOptions(), &batch);
+ return Write(options, &batch);
}
Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
@@ -555,7 +564,7 @@ Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return Status(Status::NotOK, "reach space limit");
}
auto bat = rocksdb::WriteBatch(std::move(raw_batch));
- auto s = db_->Write(rocksdb::WriteOptions(), &bat);
+ auto s = db_->Write(write_opts_, &bat);
if (!s.ok()) {
return Status(Status::NotOK, s.ToString());
}
@@ -647,7 +656,7 @@ Status Storage::WriteToPropagateCF(const std::string &key, const std::string &va
auto cf = GetCFHandle(kPropagateColumnFamilyName);
batch.Put(cf, key, value);
- auto s = Write(rocksdb::WriteOptions(), &batch);
+ auto s = Write(write_opts_, &batch);
if (!s.ok()) {
return Status(Status::NotOK, s.ToString());
}
diff --git a/src/storage.h b/src/storage.h
index 848ddd2..1ba5eee 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -65,6 +65,7 @@ class Storage {
explicit Storage(Config *config);
~Storage();
+ void SetWriteOptions(const Config::RocksDB::WriteOptions& config);
Status Open(bool read_only);
Status Open();
Status OpenForReadOnly();
@@ -86,6 +87,7 @@ class Storage {
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeq();
rocksdb::Status Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates);
+ const rocksdb::WriteOptions& DefaultWriteOptions() { return write_opts_; }
rocksdb::Status Delete(const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle,
const rocksdb::Slice &key);
@@ -187,6 +189,8 @@ class Storage {
bool db_closing_ = true;
std::atomic<bool> db_in_retryable_io_error_{false};
+
+ rocksdb::WriteOptions write_opts_ = rocksdb::WriteOptions();
};
} // namespace Engine