You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by tw...@apache.org on 2023/08/26 14:43:58 UTC
[kvrocks] branch unstable updated: Fix wrong metadata parsing in CompactOnExpiredCollector (#1703)
This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 6bcd387f Fix wrong metadata parsing in CompactOnExpiredCollector (#1703)
6bcd387f is described below
commit 6bcd387f7f91f94cc0a18b08ec9d190ca528a115
Author: Twice <tw...@gmail.com>
AuthorDate: Sat Aug 26 23:43:53 2023 +0900
Fix wrong metadata parsing in CompactOnExpiredCollector (#1703)
Co-authored-by: hulk <hu...@gmail.com>
---
src/server/server.cc | 14 +++++---------
src/server/server.h | 8 ++++----
src/storage/redis_metadata.cc | 9 +++------
src/storage/redis_metadata.h | 26 ++++++++++++++------------
src/storage/storage.cc | 3 +++
src/storage/table_properties_collector.cc | 29 ++++++++++-------------------
src/storage/table_properties_collector.h | 4 ++--
7 files changed, 41 insertions(+), 52 deletions(-)
diff --git a/src/server/server.cc b/src/server/server.cc
index b07821e7..b00decf6 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -621,11 +621,7 @@ void Server::OnEntryAddedToStream(const std::string &ns, const std::string &key,
}
}
-void Server::updateCachedTime() {
- time_t ret = util::GetTimeStamp();
- if (ret == -1) return;
- unix_time.store(static_cast<int>(ret));
-}
+void Server::updateCachedTime() { unix_time.store(util::GetTimeStamp()); }
int Server::IncrClientNum() {
total_clients_.fetch_add(1, std::memory_order::memory_order_relaxed);
@@ -983,9 +979,9 @@ void Server::SetLastRandomKeyCursor(const std::string &cursor) {
last_random_key_cursor_ = cursor;
}
-int Server::GetCachedUnixTime() {
+int64_t Server::GetCachedUnixTime() {
if (unix_time.load() == 0) {
- unix_time.store(static_cast<int>(util::GetTimeStamp()));
+ updateCachedTime();
}
return unix_time.load();
}
@@ -1274,9 +1270,9 @@ Status Server::AsyncBgSaveDB() {
std::lock_guard<std::mutex> lg(db_job_mu_);
is_bgsave_in_progress_ = false;
- last_bgsave_time_ = static_cast<int>(start_bgsave_time);
+ last_bgsave_time_ = start_bgsave_time;
last_bgsave_status_ = s.IsOK() ? "ok" : "err";
- last_bgsave_time_sec_ = static_cast<int>(stop_bgsave_time - start_bgsave_time);
+ last_bgsave_time_sec_ = stop_bgsave_time - start_bgsave_time;
});
}
diff --git a/src/server/server.h b/src/server/server.h
index 7ac6128a..b598e520 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -210,7 +210,7 @@ class Server {
std::string GetLastRandomKeyCursor();
void SetLastRandomKeyCursor(const std::string &cursor);
- static int GetCachedUnixTime();
+ static int64_t GetCachedUnixTime();
void GetStatsInfo(std::string *info);
void GetServerInfo(std::string *info);
void GetMemoryInfo(std::string *info);
@@ -271,7 +271,7 @@ class Server {
Stats stats;
engine::Storage *storage;
std::unique_ptr<Cluster> cluster;
- static inline std::atomic<int> unix_time = 0;
+ static inline std::atomic<int64_t> unix_time = 0;
std::unique_ptr<SlotMigrator> slot_migrator;
std::unique_ptr<SlotImport> slot_import;
@@ -323,9 +323,9 @@ class Server {
std::mutex db_job_mu_;
bool db_compacting_ = false;
bool is_bgsave_in_progress_ = false;
- int last_bgsave_time_ = -1;
+ int64_t last_bgsave_time_ = -1;
std::string last_bgsave_status_ = "ok";
- int last_bgsave_time_sec_ = -1;
+ int64_t last_bgsave_time_sec_ = -1;
std::map<std::string, DBScanInfo> db_scan_infos_;
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 4157866f..42288329 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -153,8 +153,7 @@ Metadata::Metadata(RedisType type, bool generate_version, bool use_64bit_common_
version(generate_version ? generateVersion() : 0),
size(0) {}
-rocksdb::Status Metadata::Decode(const std::string &bytes) {
- Slice input(bytes);
+rocksdb::Status Metadata::Decode(Slice input) {
if (!GetFixed8(&input, &flags)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
@@ -324,8 +323,7 @@ void ListMetadata::Encode(std::string *dst) {
PutFixed64(dst, tail);
}
-rocksdb::Status ListMetadata::Decode(const std::string &bytes) {
- Slice input(bytes);
+rocksdb::Status ListMetadata::Decode(Slice input) {
GetFixed8(&input, &flags);
GetExpire(&input);
if (Type() != kRedisString) {
@@ -362,8 +360,7 @@ void StreamMetadata::Encode(std::string *dst) {
PutFixed64(dst, entries_added);
}
-rocksdb::Status StreamMetadata::Decode(const std::string &bytes) {
- Slice input(bytes);
+rocksdb::Status StreamMetadata::Decode(Slice input) {
if (!GetFixed8(&input, &flags)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 5f6d6848..6c76496c 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -31,16 +31,18 @@
constexpr bool USE_64BIT_COMMON_FIELD_DEFAULT = METADATA_ENCODING_VERSION != 0;
+// We write enum integer value of every datatype
+// explicitly since it cannot be changed once confirmed
enum RedisType {
- kRedisNone,
- kRedisString,
- kRedisHash,
- kRedisList,
- kRedisSet,
- kRedisZSet,
- kRedisBitmap,
- kRedisSortedint,
- kRedisStream,
+ kRedisNone = 0,
+ kRedisString = 1,
+ kRedisHash = 2,
+ kRedisList = 3,
+ kRedisSet = 4,
+ kRedisZSet = 5,
+ kRedisBitmap = 6,
+ kRedisSortedint = 7,
+ kRedisStream = 8,
};
enum RedisCommand {
@@ -142,7 +144,7 @@ class Metadata {
bool Expired() const;
bool ExpireAt(uint64_t expired_ts) const;
virtual void Encode(std::string *dst);
- virtual rocksdb::Status Decode(const std::string &bytes);
+ virtual rocksdb::Status Decode(Slice input);
bool operator==(const Metadata &that) const;
private:
@@ -181,7 +183,7 @@ class ListMetadata : public Metadata {
explicit ListMetadata(bool generate_version = true);
void Encode(std::string *dst) override;
- rocksdb::Status Decode(const std::string &bytes) override;
+ rocksdb::Status Decode(Slice input) override;
};
class StreamMetadata : public Metadata {
@@ -196,5 +198,5 @@ class StreamMetadata : public Metadata {
explicit StreamMetadata(bool generate_version = true) : Metadata(kRedisStream, generate_version) {}
void Encode(std::string *dst) override;
- rocksdb::Status Decode(const std::string &bytes) override;
+ rocksdb::Status Decode(Slice input) override;
};
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index d566da10..775ffe41 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -643,6 +643,9 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
rocksdb::Status Storage::Compact(const Slice *begin, const Slice *end) {
rocksdb::CompactRangeOptions compact_opts;
compact_opts.change_level = true;
+ // For the manual compaction, we would like to force the bottommost level to be compacted.
+ // Or it may use the trivial mode and some expired key-values were still exist in the bottommost level.
+ compact_opts.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForceOptimized;
for (const auto &cf_handle : cf_handles_) {
rocksdb::Status s = db_->CompactRange(compact_opts, cf_handle, begin, end);
if (!s.ok()) return s;
diff --git a/src/storage/table_properties_collector.cc b/src/storage/table_properties_collector.cc
index 4bb99816..c414289e 100644
--- a/src/storage/table_properties_collector.cc
+++ b/src/storage/table_properties_collector.cc
@@ -30,10 +30,6 @@
rocksdb::Status CompactOnExpiredCollector::AddUserKey(const rocksdb::Slice &key, const rocksdb::Slice &value,
rocksdb::EntryType entry_type, rocksdb::SequenceNumber,
uint64_t) {
- uint8_t type = 0;
- uint32_t expired = 0, subkeys = 0;
- uint64_t version = 0;
-
if (start_key_.empty()) {
start_key_ = key.ToString();
}
@@ -47,23 +43,18 @@ rocksdb::Status CompactOnExpiredCollector::AddUserKey(const rocksdb::Slice &key,
if (cf_name_ != "metadata") {
return rocksdb::Status::OK();
}
- rocksdb::Slice cv = value;
- if (entry_type != rocksdb::kEntryPut || cv.size() < 5) {
+
+ if (entry_type != rocksdb::kEntryPut || value.size() < 5) {
return rocksdb::Status::OK();
}
- GetFixed8(&cv, &type);
- GetFixed32(&cv, &expired);
- type = type & (uint8_t)0x0f;
- if (type == kRedisBitmap || type == kRedisSet || type == kRedisList || type == kRedisHash || type == kRedisZSet ||
- type == kRedisSortedint) {
- if (cv.size() <= 12) return rocksdb::Status::OK();
- GetFixed64(&cv, &version);
- GetFixed32(&cv, &subkeys);
- }
- total_keys_ += subkeys;
- int now = Server::GetCachedUnixTime();
- if ((expired > 0 && expired < static_cast<uint32_t>(now)) || (type != kRedisString && subkeys == 0)) {
- deleted_keys_ += subkeys + 1;
+
+ Metadata metadata(RedisType::kRedisNone);
+ auto s = metadata.Decode(value);
+ if (!s.ok()) return rocksdb::Status::OK();
+
+ total_keys_ += metadata.size;
+ if (metadata.ExpireAt(Server::GetCachedUnixTime() * 1000)) {
+ deleted_keys_ += metadata.size + 1;
}
return rocksdb::Status::OK();
}
diff --git a/src/storage/table_properties_collector.h b/src/storage/table_properties_collector.h
index a0dfd0df..b640139b 100644
--- a/src/storage/table_properties_collector.h
+++ b/src/storage/table_properties_collector.h
@@ -40,8 +40,8 @@ class CompactOnExpiredCollector : public rocksdb::TablePropertiesCollector {
private:
std::string cf_name_;
float trigger_threshold_;
- int64_t total_keys_ = 0;
- int64_t deleted_keys_ = 0;
+ uint64_t total_keys_ = 0;
+ uint64_t deleted_keys_ = 0;
std::string start_key_;
std::string stop_key_;
};