You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by to...@apache.org on 2023/01/11 15:38:45 UTC

[incubator-kvrocks] 02/02: Tidy some Kvrocks2Redis code

This is an automated email from the ASF dual-hosted git repository.

torwig pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git

commit 35bce1aec8e7358faaacf686ba81b77ab238af3c
Author: Yaroslav Stepanchuk <to...@gmail.com>
AuthorDate: Tue Jan 10 19:05:37 2023 +0200

    Tidy some Kvrocks2Redis code
---
 utils/kvrocks2redis/parser.cc       | 80 ++++++++++++++++++++++---------------
 utils/kvrocks2redis/parser.h        | 16 ++++----
 utils/kvrocks2redis/redis_writer.cc | 11 +++++
 utils/kvrocks2redis/sync.cc         | 10 +++--
 4 files changed, 73 insertions(+), 44 deletions(-)

diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index cb74ae94..1414183b 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -28,14 +28,15 @@
 #include "cluster/redis_slot.h"
 #include "db_util.h"
 #include "server/redis_reply.h"
+#include "types/redis_string.h"
 
 Status Parser::ParseFullDB() {
   rocksdb::DB *db_ = storage_->GetDB();
-  if (!lastest_snapshot_) lastest_snapshot_ = std::make_unique<LatestSnapShot>(db_);
-  rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = storage_->GetCFHandle("metadata");
+  if (!latest_snapshot_) latest_snapshot_ = std::make_unique<LatestSnapShot>(db_);
+  rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = storage_->GetCFHandle(Engine::kMetadataColumnFamilyName);
 
   rocksdb::ReadOptions read_options;
-  read_options.snapshot = lastest_snapshot_->GetSnapShot();
+  read_options.snapshot = latest_snapshot_->GetSnapShot();
   read_options.fill_cache = false;
   std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(read_options, metadata_cf_handle_));
   Status s;
@@ -46,6 +47,7 @@ Status Parser::ParseFullDB() {
     if (metadata.Expired()) {  // ignore the expired key
       continue;
     }
+
     if (metadata.Type() == kRedisString) {
       s = parseSimpleKV(iter->key(), iter->value(), metadata.expire);
     } else {
@@ -53,51 +55,57 @@ Status Parser::ParseFullDB() {
     }
     if (!s.IsOK()) return s;
   }
+
   return Status::OK();
 }
 
 Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, int expire) {
-  std::string op, ns, user_key;
-  ExtractNamespaceKey(ns_key, &ns, &user_key, is_slotid_encoded_);
-  std::string output;
-  output = Redis::Command2RESP({"SET", user_key, value.ToString().substr(5, value.size() - 5)});
-  Status s = writer_->Write(ns, {output});
+  std::string ns, user_key;
+  ExtractNamespaceKey(ns_key, &ns, &user_key, slot_id_encoded_);
+
+  auto command = Redis::Command2RESP(
+      {"SET", user_key, value.ToString().substr(Redis::STRING_HDR_SIZE, value.size() - Redis::STRING_HDR_SIZE)});
+  Status s = writer_->Write(ns, {command});
   if (!s.IsOK()) return s;
 
   if (expire > 0) {
-    output = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire)});
-    s = writer_->Write(ns, {output});
+    command = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire)});
+    s = writer_->Write(ns, {command});
   }
+
   return s;
 }
 
 Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
   RedisType type = metadata.Type();
   if (type < kRedisHash || type > kRedisSortedint) {
-    return Status(Status::NotOK, "unknown metadata type: " + std::to_string(type));
+    return {Status::NotOK, "unknown metadata type: " + std::to_string(type)};
   }
 
-  std::string ns, prefix_key, user_key, sub_key, value, output, next_version_prefix_key;
-  ExtractNamespaceKey(ns_key, &ns, &user_key, is_slotid_encoded_);
-  InternalKey(ns_key, "", metadata.version, is_slotid_encoded_).Encode(&prefix_key);
-  InternalKey(ns_key, "", metadata.version + 1, is_slotid_encoded_).Encode(&next_version_prefix_key);
+  std::string ns, user_key;
+  ExtractNamespaceKey(ns_key, &ns, &user_key, slot_id_encoded_);
+  std::string prefix_key;
+  InternalKey(ns_key, "", metadata.version, slot_id_encoded_).Encode(&prefix_key);
+  std::string next_version_prefix_key;
+  InternalKey(ns_key, "", metadata.version + 1, slot_id_encoded_).Encode(&next_version_prefix_key);
 
   rocksdb::DB *db_ = storage_->GetDB();
   rocksdb::ReadOptions read_options;
-  read_options.snapshot = lastest_snapshot_->GetSnapShot();
+  read_options.snapshot = latest_snapshot_->GetSnapShot();
   rocksdb::Slice upper_bound(next_version_prefix_key);
   read_options.iterate_upper_bound = &upper_bound;
   read_options.fill_cache = false;
 
+  std::string output;
   auto iter = DBUtil::UniqueIterator(db_, read_options);
   for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) {
     if (!iter->key().starts_with(prefix_key)) {
       break;
     }
-    Status s;
-    InternalKey ikey(iter->key(), is_slotid_encoded_);
-    sub_key = ikey.GetSubKey().ToString();
-    value = iter->value().ToString();
+
+    InternalKey ikey(iter->key(), slot_id_encoded_);
+    std::string sub_key = ikey.GetSubKey().ToString();
+    std::string value = iter->value().ToString();
     switch (type) {
       case kRedisHash:
         output = Redis::Command2RESP({"HSET", user_key, sub_key, value});
@@ -110,12 +118,13 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
         break;
       case kRedisZSet: {
         double score = DecodeDouble(value.data());
-        output = Redis::Command2RESP({"ZADD", user_key, std::to_string(score), sub_key});
+        output = Redis::Command2RESP({"ZADD", user_key, Util::Float2String(score), sub_key});
         break;
       }
       case kRedisBitmap: {
         int index = std::stoi(sub_key);
-        s = Parser::parseBitmapSegment(ns, user_key, index, value);
+        auto s = Parser::parseBitmapSegment(ns, user_key, index, value);
+        if (!s.IsOK()) return s.Prefixed("failed to parse bitmap segment");
         break;
       }
       case kRedisSortedint: {
@@ -126,16 +135,17 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
       default:
         break;  // should never get here
     }
+
     if (type != kRedisBitmap) {
-      s = writer_->Write(ns, {output});
+      auto s = writer_->Write(ns, {output});
+      if (!s.IsOK()) return s.Prefixed(fmt::format("failed to write the '{}' command to AOF", output));
     }
-    if (!s.IsOK()) return s;
   }
 
   if (metadata.expire > 0) {
     output = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire)});
     Status s = writer_->Write(ns, {output});
-    if (!s.IsOK()) return s;
+    if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to AOF");
   }
 
   return Status::OK();
@@ -145,30 +155,34 @@ Status Parser::parseBitmapSegment(const Slice &ns, const Slice &user_key, int in
   Status s;
   for (size_t i = 0; i < bitmap.size(); i++) {
     if (bitmap[i] == 0) continue;  // ignore zero byte
+
     for (int j = 0; j < 8; j++) {
       if (!(bitmap[i] & (1 << j))) continue;  // ignore zero bit
+
       s = writer_->Write(
           ns.ToString(),
           {Redis::Command2RESP({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})});
-      if (!s.IsOK()) return s;
+      if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to AOF");
     }
   }
   return Status::OK();
 }
 
-rocksdb::Status Parser::ParseWriteBatch(const std::string &batch_string) {
+Status Parser::ParseWriteBatch(const std::string &batch_string) {
   rocksdb::WriteBatch write_batch(batch_string);
-  WriteBatchExtractor write_batch_extractor(is_slotid_encoded_, -1, true);
-  rocksdb::Status status;
+  WriteBatchExtractor write_batch_extractor(slot_id_encoded_, -1, true);
+
+  auto db_status = write_batch.Iterate(&write_batch_extractor);
+  if (!db_status.ok())
+    return {Status::NotOK, fmt::format("failed to iterate over the write batch: {}", db_status.ToString())};
 
-  status = write_batch.Iterate(&write_batch_extractor);
-  if (!status.ok()) return status;
   auto resp_commands = write_batch_extractor.GetRESPCommands();
   for (const auto &iter : *resp_commands) {
     auto s = writer_->Write(iter.first, iter.second);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[kvrocks2redis] Failed to parse WriteBatch, encounter error: " << s.Msg();
+      LOG(ERROR) << "[kvrocks2redis] Failed to write to AOF from the write batch. Error: " << s.Msg();
     }
   }
-  return rocksdb::Status::OK();
+
+  return Status::OK();
 }
diff --git a/utils/kvrocks2redis/parser.h b/utils/kvrocks2redis/parser.h
index 493d3e00..2536a868 100644
--- a/utils/kvrocks2redis/parser.h
+++ b/utils/kvrocks2redis/parser.h
@@ -35,7 +35,7 @@
 
 class LatestSnapShot {
  public:
-  explicit LatestSnapShot(rocksdb::DB *db) : db_(db) { snapshot_ = db_->GetSnapshot(); }
+  explicit LatestSnapShot(rocksdb::DB *db) : db_(db), snapshot_(db_->GetSnapshot()) {}
   ~LatestSnapShot() { db_->ReleaseSnapshot(snapshot_); }
   const rocksdb::Snapshot *GetSnapShot() { return snapshot_; }
 
@@ -46,20 +46,20 @@ class LatestSnapShot {
 
 class Parser {
  public:
-  explicit Parser(Engine::Storage *storage, Writer *writer) : storage_(storage), writer_(writer) {
-    lastest_snapshot_ = std::unique_ptr<LatestSnapShot>(new LatestSnapShot(storage->GetDB()));
-    is_slotid_encoded_ = storage_->IsSlotIdEncoded();
+  explicit Parser(Engine::Storage *storage, Writer *writer)
+      : storage_(storage), writer_(writer), slot_id_encoded_(storage_->IsSlotIdEncoded()) {
+    latest_snapshot_ = std::make_unique<LatestSnapShot>(storage->GetDB());
   }
-  ~Parser() {}
+  ~Parser() = default;
 
   Status ParseFullDB();
-  rocksdb::Status ParseWriteBatch(const std::string &batch_string);
+  Status ParseWriteBatch(const std::string &batch_string);
 
  protected:
   Engine::Storage *storage_ = nullptr;
   Writer *writer_ = nullptr;
-  std::unique_ptr<LatestSnapShot> lastest_snapshot_ = nullptr;
-  bool is_slotid_encoded_ = false;
+  std::unique_ptr<LatestSnapShot> latest_snapshot_;
+  bool slot_id_encoded_ = false;
 
   Status parseSimpleKV(const Slice &ns_key, const Slice &value, int expire);
   Status parseComplexKV(const Slice &ns_key, const Metadata &metadata);
diff --git a/utils/kvrocks2redis/redis_writer.cc b/utils/kvrocks2redis/redis_writer.cc
index 6d3c845b..4ac41a26 100644
--- a/utils/kvrocks2redis/redis_writer.cc
+++ b/utils/kvrocks2redis/redis_writer.cc
@@ -106,11 +106,13 @@ void RedisWriter::sync() {
         LOG(ERROR) << s.Msg();
         continue;
       }
+
       s = getRedisConn(iter.first, iter.second.host, iter.second.port, iter.second.auth, iter.second.db_number);
       if (!s.IsOK()) {
         LOG(ERROR) << s.Msg();
         continue;
       }
+
       while (true) {
         auto getted_line_leng = pread(aof_fds_[iter.first], buffer, chunk_size, next_offsets_[iter.first]);
         if (getted_line_leng <= 0) {
@@ -119,17 +121,20 @@ void RedisWriter::sync() {
           }
           break;
         }
+
         std::string con = std::string(buffer, getted_line_leng);
         s = Util::SockSend(redis_fds_[iter.first], std::string(buffer, getted_line_leng));
         if (!s.IsOK()) {
           LOG(ERROR) << "ERR send data to redis err: " << s.Msg();
           break;
         }
+
         auto line_state = Util::SockReadLine(redis_fds_[iter.first]);
         if (!line_state) {
           LOG(ERROR) << "read redis response err: " << s.Msg();
           break;
         }
+
         std::string line = *line_state;
         if (line.compare(0, 1, "-") == 0) {
           // Ooops, something went wrong , sync process has been terminated, administrator should be notified
@@ -139,6 +144,7 @@ void RedisWriter::sync() {
           Stop();
           return;
         }
+
         s = updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng);
         if (!s.IsOK()) {
           LOG(ERROR) << "ERR updating next offset: " << s.Msg();
@@ -148,6 +154,7 @@ void RedisWriter::sync() {
       std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
   }
+
   delete[] buffer;
 }
 
@@ -165,6 +172,7 @@ Status RedisWriter::getRedisConn(const std::string &ns, const std::string &host,
         return s;
       }
     }
+
     if (db_index != 0) {
       auto s = selectDB(ns, db_index);
       if (!s.IsOK()) {
@@ -189,6 +197,7 @@ Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) {
   if (line.compare(0, 3, "+OK") != 0) {
     return {Status::NotOK, "[kvrocks2redis] redis Auth failed: " + line};
   }
+
   return Status::OK();
 }
 
@@ -200,11 +209,13 @@ Status RedisWriter::selectDB(const std::string &ns, int db_number) {
   if (!s.IsOK()) {
     return s.Prefixed("failed to send SELECT command to socket");
   }
+
   LOG(INFO) << "[kvrocks2redis] select db request was sent, waiting for response";
   std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read select db response err"));
   if (line.compare(0, 3, "+OK") != 0) {
     return {Status::NotOK, "[kvrocks2redis] redis select db failed: " + line};
   }
+
   return Status::OK();
 }
 
diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc
index e70c2b59..57d06d9b 100644
--- a/utils/kvrocks2redis/sync.cc
+++ b/utils/kvrocks2redis/sync.cc
@@ -155,7 +155,7 @@ Status Sync::incrementBatchLoop() {
   while (!IsStopped()) {
     if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) {
       evbuffer_free(evbuf);
-      return {Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno)};
+      return {Status::NotOK, std::string("[kvrocks2redis] read increment batch err: ") + strerror(errno)};
     }
     if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) {
       // Read bulk length
@@ -180,8 +180,12 @@ Status Sync::incrementBatchLoop() {
         if (bulk_data_str != "ping") {
           auto bat = rocksdb::WriteBatch(bulk_data_str);
           int count = static_cast<int>(bat.Count());
-          parser_->ParseWriteBatch(bulk_data_str);
-          auto s = updateNextSeq(next_seq_ + count);
+          auto s = parser_->ParseWriteBatch(bulk_data_str);
+          if (!s.IsOK()) {
+            return s.Prefixed(fmt::format("failed to parse write batch '{}'", Util::StringToHex(bulk_data_str)));
+          }
+
+          s = updateNextSeq(next_seq_ + count);
           if (!s.IsOK()) {
             return s.Prefixed("failed to update next sequence");
           }