You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by tw...@apache.org on 2023/08/26 14:43:58 UTC

[kvrocks] branch unstable updated: Fix wrong metadata parsing in CompactOnExpiredCollector (#1703)

This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 6bcd387f Fix wrong metadata parsing in CompactOnExpiredCollector (#1703)
6bcd387f is described below

commit 6bcd387f7f91f94cc0a18b08ec9d190ca528a115
Author: Twice <tw...@gmail.com>
AuthorDate: Sat Aug 26 23:43:53 2023 +0900

    Fix wrong metadata parsing in CompactOnExpiredCollector (#1703)
    
    Co-authored-by: hulk <hu...@gmail.com>
---
 src/server/server.cc                      | 14 +++++---------
 src/server/server.h                       |  8 ++++----
 src/storage/redis_metadata.cc             |  9 +++------
 src/storage/redis_metadata.h              | 26 ++++++++++++++------------
 src/storage/storage.cc                    |  3 +++
 src/storage/table_properties_collector.cc | 29 ++++++++++-------------------
 src/storage/table_properties_collector.h  |  4 ++--
 7 files changed, 41 insertions(+), 52 deletions(-)

diff --git a/src/server/server.cc b/src/server/server.cc
index b07821e7..b00decf6 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -621,11 +621,7 @@ void Server::OnEntryAddedToStream(const std::string &ns, const std::string &key,
   }
 }
 
-void Server::updateCachedTime() {
-  time_t ret = util::GetTimeStamp();
-  if (ret == -1) return;
-  unix_time.store(static_cast<int>(ret));
-}
+void Server::updateCachedTime() { unix_time.store(util::GetTimeStamp()); }
 
 int Server::IncrClientNum() {
   total_clients_.fetch_add(1, std::memory_order::memory_order_relaxed);
@@ -983,9 +979,9 @@ void Server::SetLastRandomKeyCursor(const std::string &cursor) {
   last_random_key_cursor_ = cursor;
 }
 
-int Server::GetCachedUnixTime() {
+int64_t Server::GetCachedUnixTime() {
   if (unix_time.load() == 0) {
-    unix_time.store(static_cast<int>(util::GetTimeStamp()));
+    updateCachedTime();
   }
   return unix_time.load();
 }
@@ -1274,9 +1270,9 @@ Status Server::AsyncBgSaveDB() {
 
     std::lock_guard<std::mutex> lg(db_job_mu_);
     is_bgsave_in_progress_ = false;
-    last_bgsave_time_ = static_cast<int>(start_bgsave_time);
+    last_bgsave_time_ = start_bgsave_time;
     last_bgsave_status_ = s.IsOK() ? "ok" : "err";
-    last_bgsave_time_sec_ = static_cast<int>(stop_bgsave_time - start_bgsave_time);
+    last_bgsave_time_sec_ = stop_bgsave_time - start_bgsave_time;
   });
 }
 
diff --git a/src/server/server.h b/src/server/server.h
index 7ac6128a..b598e520 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -210,7 +210,7 @@ class Server {
   std::string GetLastRandomKeyCursor();
   void SetLastRandomKeyCursor(const std::string &cursor);
 
-  static int GetCachedUnixTime();
+  static int64_t GetCachedUnixTime();
   void GetStatsInfo(std::string *info);
   void GetServerInfo(std::string *info);
   void GetMemoryInfo(std::string *info);
@@ -271,7 +271,7 @@ class Server {
   Stats stats;
   engine::Storage *storage;
   std::unique_ptr<Cluster> cluster;
-  static inline std::atomic<int> unix_time = 0;
+  static inline std::atomic<int64_t> unix_time = 0;
   std::unique_ptr<SlotMigrator> slot_migrator;
   std::unique_ptr<SlotImport> slot_import;
 
@@ -323,9 +323,9 @@ class Server {
   std::mutex db_job_mu_;
   bool db_compacting_ = false;
   bool is_bgsave_in_progress_ = false;
-  int last_bgsave_time_ = -1;
+  int64_t last_bgsave_time_ = -1;
   std::string last_bgsave_status_ = "ok";
-  int last_bgsave_time_sec_ = -1;
+  int64_t last_bgsave_time_sec_ = -1;
 
   std::map<std::string, DBScanInfo> db_scan_infos_;
 
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 4157866f..42288329 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -153,8 +153,7 @@ Metadata::Metadata(RedisType type, bool generate_version, bool use_64bit_common_
       version(generate_version ? generateVersion() : 0),
       size(0) {}
 
-rocksdb::Status Metadata::Decode(const std::string &bytes) {
-  Slice input(bytes);
+rocksdb::Status Metadata::Decode(Slice input) {
   if (!GetFixed8(&input, &flags)) {
     return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
   }
@@ -324,8 +323,7 @@ void ListMetadata::Encode(std::string *dst) {
   PutFixed64(dst, tail);
 }
 
-rocksdb::Status ListMetadata::Decode(const std::string &bytes) {
-  Slice input(bytes);
+rocksdb::Status ListMetadata::Decode(Slice input) {
   GetFixed8(&input, &flags);
   GetExpire(&input);
   if (Type() != kRedisString) {
@@ -362,8 +360,7 @@ void StreamMetadata::Encode(std::string *dst) {
   PutFixed64(dst, entries_added);
 }
 
-rocksdb::Status StreamMetadata::Decode(const std::string &bytes) {
-  Slice input(bytes);
+rocksdb::Status StreamMetadata::Decode(Slice input) {
   if (!GetFixed8(&input, &flags)) {
     return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
   }
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 5f6d6848..6c76496c 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -31,16 +31,18 @@
 
 constexpr bool USE_64BIT_COMMON_FIELD_DEFAULT = METADATA_ENCODING_VERSION != 0;
 
+// We write enum integer value of every datatype
+// explicitly since it cannot be changed once confirmed
 enum RedisType {
-  kRedisNone,
-  kRedisString,
-  kRedisHash,
-  kRedisList,
-  kRedisSet,
-  kRedisZSet,
-  kRedisBitmap,
-  kRedisSortedint,
-  kRedisStream,
+  kRedisNone = 0,
+  kRedisString = 1,
+  kRedisHash = 2,
+  kRedisList = 3,
+  kRedisSet = 4,
+  kRedisZSet = 5,
+  kRedisBitmap = 6,
+  kRedisSortedint = 7,
+  kRedisStream = 8,
 };
 
 enum RedisCommand {
@@ -142,7 +144,7 @@ class Metadata {
   bool Expired() const;
   bool ExpireAt(uint64_t expired_ts) const;
   virtual void Encode(std::string *dst);
-  virtual rocksdb::Status Decode(const std::string &bytes);
+  virtual rocksdb::Status Decode(Slice input);
   bool operator==(const Metadata &that) const;
 
  private:
@@ -181,7 +183,7 @@ class ListMetadata : public Metadata {
   explicit ListMetadata(bool generate_version = true);
 
   void Encode(std::string *dst) override;
-  rocksdb::Status Decode(const std::string &bytes) override;
+  rocksdb::Status Decode(Slice input) override;
 };
 
 class StreamMetadata : public Metadata {
@@ -196,5 +198,5 @@ class StreamMetadata : public Metadata {
   explicit StreamMetadata(bool generate_version = true) : Metadata(kRedisStream, generate_version) {}
 
   void Encode(std::string *dst) override;
-  rocksdb::Status Decode(const std::string &bytes) override;
+  rocksdb::Status Decode(Slice input) override;
 };
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index d566da10..775ffe41 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -643,6 +643,9 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) {
 rocksdb::Status Storage::Compact(const Slice *begin, const Slice *end) {
   rocksdb::CompactRangeOptions compact_opts;
   compact_opts.change_level = true;
+  // For the manual compaction, we would like to force the bottommost level to be compacted.
+  // Or it may use the trivial mode and some expired key-values were still exist in the bottommost level.
+  compact_opts.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForceOptimized;
   for (const auto &cf_handle : cf_handles_) {
     rocksdb::Status s = db_->CompactRange(compact_opts, cf_handle, begin, end);
     if (!s.ok()) return s;
diff --git a/src/storage/table_properties_collector.cc b/src/storage/table_properties_collector.cc
index 4bb99816..c414289e 100644
--- a/src/storage/table_properties_collector.cc
+++ b/src/storage/table_properties_collector.cc
@@ -30,10 +30,6 @@
 rocksdb::Status CompactOnExpiredCollector::AddUserKey(const rocksdb::Slice &key, const rocksdb::Slice &value,
                                                       rocksdb::EntryType entry_type, rocksdb::SequenceNumber,
                                                       uint64_t) {
-  uint8_t type = 0;
-  uint32_t expired = 0, subkeys = 0;
-  uint64_t version = 0;
-
   if (start_key_.empty()) {
     start_key_ = key.ToString();
   }
@@ -47,23 +43,18 @@ rocksdb::Status CompactOnExpiredCollector::AddUserKey(const rocksdb::Slice &key,
   if (cf_name_ != "metadata") {
     return rocksdb::Status::OK();
   }
-  rocksdb::Slice cv = value;
-  if (entry_type != rocksdb::kEntryPut || cv.size() < 5) {
+
+  if (entry_type != rocksdb::kEntryPut || value.size() < 5) {
     return rocksdb::Status::OK();
   }
-  GetFixed8(&cv, &type);
-  GetFixed32(&cv, &expired);
-  type = type & (uint8_t)0x0f;
-  if (type == kRedisBitmap || type == kRedisSet || type == kRedisList || type == kRedisHash || type == kRedisZSet ||
-      type == kRedisSortedint) {
-    if (cv.size() <= 12) return rocksdb::Status::OK();
-    GetFixed64(&cv, &version);
-    GetFixed32(&cv, &subkeys);
-  }
-  total_keys_ += subkeys;
-  int now = Server::GetCachedUnixTime();
-  if ((expired > 0 && expired < static_cast<uint32_t>(now)) || (type != kRedisString && subkeys == 0)) {
-    deleted_keys_ += subkeys + 1;
+
+  Metadata metadata(RedisType::kRedisNone);
+  auto s = metadata.Decode(value);
+  if (!s.ok()) return rocksdb::Status::OK();
+
+  total_keys_ += metadata.size;
+  if (metadata.ExpireAt(Server::GetCachedUnixTime() * 1000)) {
+    deleted_keys_ += metadata.size + 1;
   }
   return rocksdb::Status::OK();
 }
diff --git a/src/storage/table_properties_collector.h b/src/storage/table_properties_collector.h
index a0dfd0df..b640139b 100644
--- a/src/storage/table_properties_collector.h
+++ b/src/storage/table_properties_collector.h
@@ -40,8 +40,8 @@ class CompactOnExpiredCollector : public rocksdb::TablePropertiesCollector {
  private:
   std::string cf_name_;
   float trigger_threshold_;
-  int64_t total_keys_ = 0;
-  int64_t deleted_keys_ = 0;
+  uint64_t total_keys_ = 0;
+  uint64_t deleted_keys_ = 0;
   std::string start_key_;
   std::string stop_key_;
 };