You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by to...@apache.org on 2023/01/11 15:38:45 UTC
[incubator-kvrocks] 02/02: Tidy some Kvrocks2Redis code
This is an automated email from the ASF dual-hosted git repository.
torwig pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
commit 35bce1aec8e7358faaacf686ba81b77ab238af3c
Author: Yaroslav Stepanchuk <to...@gmail.com>
AuthorDate: Tue Jan 10 19:05:37 2023 +0200
Tidy some Kvrocks2Redis code
---
utils/kvrocks2redis/parser.cc | 80 ++++++++++++++++++++++---------------
utils/kvrocks2redis/parser.h | 16 ++++----
utils/kvrocks2redis/redis_writer.cc | 11 +++++
utils/kvrocks2redis/sync.cc | 10 +++--
4 files changed, 73 insertions(+), 44 deletions(-)
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index cb74ae94..1414183b 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -28,14 +28,15 @@
#include "cluster/redis_slot.h"
#include "db_util.h"
#include "server/redis_reply.h"
+#include "types/redis_string.h"
Status Parser::ParseFullDB() {
rocksdb::DB *db_ = storage_->GetDB();
- if (!lastest_snapshot_) lastest_snapshot_ = std::make_unique<LatestSnapShot>(db_);
- rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = storage_->GetCFHandle("metadata");
+ if (!latest_snapshot_) latest_snapshot_ = std::make_unique<LatestSnapShot>(db_);
+ rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = storage_->GetCFHandle(Engine::kMetadataColumnFamilyName);
rocksdb::ReadOptions read_options;
- read_options.snapshot = lastest_snapshot_->GetSnapShot();
+ read_options.snapshot = latest_snapshot_->GetSnapShot();
read_options.fill_cache = false;
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(read_options, metadata_cf_handle_));
Status s;
@@ -46,6 +47,7 @@ Status Parser::ParseFullDB() {
if (metadata.Expired()) { // ignore the expired key
continue;
}
+
if (metadata.Type() == kRedisString) {
s = parseSimpleKV(iter->key(), iter->value(), metadata.expire);
} else {
@@ -53,51 +55,57 @@ Status Parser::ParseFullDB() {
}
if (!s.IsOK()) return s;
}
+
return Status::OK();
}
Status Parser::parseSimpleKV(const Slice &ns_key, const Slice &value, int expire) {
- std::string op, ns, user_key;
- ExtractNamespaceKey(ns_key, &ns, &user_key, is_slotid_encoded_);
- std::string output;
- output = Redis::Command2RESP({"SET", user_key, value.ToString().substr(5, value.size() - 5)});
- Status s = writer_->Write(ns, {output});
+ std::string ns, user_key;
+ ExtractNamespaceKey(ns_key, &ns, &user_key, slot_id_encoded_);
+
+ auto command = Redis::Command2RESP(
+ {"SET", user_key, value.ToString().substr(Redis::STRING_HDR_SIZE, value.size() - Redis::STRING_HDR_SIZE)});
+ Status s = writer_->Write(ns, {command});
if (!s.IsOK()) return s;
if (expire > 0) {
- output = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire)});
- s = writer_->Write(ns, {output});
+ command = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(expire)});
+ s = writer_->Write(ns, {command});
}
+
return s;
}
Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
RedisType type = metadata.Type();
if (type < kRedisHash || type > kRedisSortedint) {
- return Status(Status::NotOK, "unknown metadata type: " + std::to_string(type));
+ return {Status::NotOK, "unknown metadata type: " + std::to_string(type)};
}
- std::string ns, prefix_key, user_key, sub_key, value, output, next_version_prefix_key;
- ExtractNamespaceKey(ns_key, &ns, &user_key, is_slotid_encoded_);
- InternalKey(ns_key, "", metadata.version, is_slotid_encoded_).Encode(&prefix_key);
- InternalKey(ns_key, "", metadata.version + 1, is_slotid_encoded_).Encode(&next_version_prefix_key);
+ std::string ns, user_key;
+ ExtractNamespaceKey(ns_key, &ns, &user_key, slot_id_encoded_);
+ std::string prefix_key;
+ InternalKey(ns_key, "", metadata.version, slot_id_encoded_).Encode(&prefix_key);
+ std::string next_version_prefix_key;
+ InternalKey(ns_key, "", metadata.version + 1, slot_id_encoded_).Encode(&next_version_prefix_key);
rocksdb::DB *db_ = storage_->GetDB();
rocksdb::ReadOptions read_options;
- read_options.snapshot = lastest_snapshot_->GetSnapShot();
+ read_options.snapshot = latest_snapshot_->GetSnapShot();
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
read_options.fill_cache = false;
+ std::string output;
auto iter = DBUtil::UniqueIterator(db_, read_options);
for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) {
if (!iter->key().starts_with(prefix_key)) {
break;
}
- Status s;
- InternalKey ikey(iter->key(), is_slotid_encoded_);
- sub_key = ikey.GetSubKey().ToString();
- value = iter->value().ToString();
+
+ InternalKey ikey(iter->key(), slot_id_encoded_);
+ std::string sub_key = ikey.GetSubKey().ToString();
+ std::string value = iter->value().ToString();
switch (type) {
case kRedisHash:
output = Redis::Command2RESP({"HSET", user_key, sub_key, value});
@@ -110,12 +118,13 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
break;
case kRedisZSet: {
double score = DecodeDouble(value.data());
- output = Redis::Command2RESP({"ZADD", user_key, std::to_string(score), sub_key});
+ output = Redis::Command2RESP({"ZADD", user_key, Util::Float2String(score), sub_key});
break;
}
case kRedisBitmap: {
int index = std::stoi(sub_key);
- s = Parser::parseBitmapSegment(ns, user_key, index, value);
+ auto s = Parser::parseBitmapSegment(ns, user_key, index, value);
+ if (!s.IsOK()) return s.Prefixed("failed to parse bitmap segment");
break;
}
case kRedisSortedint: {
@@ -126,16 +135,17 @@ Status Parser::parseComplexKV(const Slice &ns_key, const Metadata &metadata) {
default:
break; // should never get here
}
+
if (type != kRedisBitmap) {
- s = writer_->Write(ns, {output});
+ auto s = writer_->Write(ns, {output});
+ if (!s.IsOK()) return s.Prefixed(fmt::format("failed to write the '{}' command to AOF", output));
}
- if (!s.IsOK()) return s;
}
if (metadata.expire > 0) {
output = Redis::Command2RESP({"EXPIREAT", user_key, std::to_string(metadata.expire)});
Status s = writer_->Write(ns, {output});
- if (!s.IsOK()) return s;
+ if (!s.IsOK()) return s.Prefixed("failed to write the EXPIREAT command to AOF");
}
return Status::OK();
@@ -145,30 +155,34 @@ Status Parser::parseBitmapSegment(const Slice &ns, const Slice &user_key, int in
Status s;
for (size_t i = 0; i < bitmap.size(); i++) {
if (bitmap[i] == 0) continue; // ignore zero byte
+
for (int j = 0; j < 8; j++) {
if (!(bitmap[i] & (1 << j))) continue; // ignore zero bit
+
s = writer_->Write(
ns.ToString(),
{Redis::Command2RESP({"SETBIT", user_key.ToString(), std::to_string(index * 8 + i * 8 + j), "1"})});
- if (!s.IsOK()) return s;
+ if (!s.IsOK()) return s.Prefixed("failed to write SETBIT command to AOF");
}
}
return Status::OK();
}
-rocksdb::Status Parser::ParseWriteBatch(const std::string &batch_string) {
+Status Parser::ParseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
- WriteBatchExtractor write_batch_extractor(is_slotid_encoded_, -1, true);
- rocksdb::Status status;
+ WriteBatchExtractor write_batch_extractor(slot_id_encoded_, -1, true);
+
+ auto db_status = write_batch.Iterate(&write_batch_extractor);
+ if (!db_status.ok())
+ return {Status::NotOK, fmt::format("failed to iterate over the write batch: {}", db_status.ToString())};
- status = write_batch.Iterate(&write_batch_extractor);
- if (!status.ok()) return status;
auto resp_commands = write_batch_extractor.GetRESPCommands();
for (const auto &iter : *resp_commands) {
auto s = writer_->Write(iter.first, iter.second);
if (!s.IsOK()) {
- LOG(ERROR) << "[kvrocks2redis] Failed to parse WriteBatch, encounter error: " << s.Msg();
+ LOG(ERROR) << "[kvrocks2redis] Failed to write to AOF from the write batch. Error: " << s.Msg();
}
}
- return rocksdb::Status::OK();
+
+ return Status::OK();
}
diff --git a/utils/kvrocks2redis/parser.h b/utils/kvrocks2redis/parser.h
index 493d3e00..2536a868 100644
--- a/utils/kvrocks2redis/parser.h
+++ b/utils/kvrocks2redis/parser.h
@@ -35,7 +35,7 @@
class LatestSnapShot {
public:
- explicit LatestSnapShot(rocksdb::DB *db) : db_(db) { snapshot_ = db_->GetSnapshot(); }
+ explicit LatestSnapShot(rocksdb::DB *db) : db_(db), snapshot_(db_->GetSnapshot()) {}
~LatestSnapShot() { db_->ReleaseSnapshot(snapshot_); }
const rocksdb::Snapshot *GetSnapShot() { return snapshot_; }
@@ -46,20 +46,20 @@ class LatestSnapShot {
class Parser {
public:
- explicit Parser(Engine::Storage *storage, Writer *writer) : storage_(storage), writer_(writer) {
- lastest_snapshot_ = std::unique_ptr<LatestSnapShot>(new LatestSnapShot(storage->GetDB()));
- is_slotid_encoded_ = storage_->IsSlotIdEncoded();
+ explicit Parser(Engine::Storage *storage, Writer *writer)
+ : storage_(storage), writer_(writer), slot_id_encoded_(storage_->IsSlotIdEncoded()) {
+ latest_snapshot_ = std::make_unique<LatestSnapShot>(storage->GetDB());
}
- ~Parser() {}
+ ~Parser() = default;
Status ParseFullDB();
- rocksdb::Status ParseWriteBatch(const std::string &batch_string);
+ Status ParseWriteBatch(const std::string &batch_string);
protected:
Engine::Storage *storage_ = nullptr;
Writer *writer_ = nullptr;
- std::unique_ptr<LatestSnapShot> lastest_snapshot_ = nullptr;
- bool is_slotid_encoded_ = false;
+ std::unique_ptr<LatestSnapShot> latest_snapshot_;
+ bool slot_id_encoded_ = false;
Status parseSimpleKV(const Slice &ns_key, const Slice &value, int expire);
Status parseComplexKV(const Slice &ns_key, const Metadata &metadata);
diff --git a/utils/kvrocks2redis/redis_writer.cc b/utils/kvrocks2redis/redis_writer.cc
index 6d3c845b..4ac41a26 100644
--- a/utils/kvrocks2redis/redis_writer.cc
+++ b/utils/kvrocks2redis/redis_writer.cc
@@ -106,11 +106,13 @@ void RedisWriter::sync() {
LOG(ERROR) << s.Msg();
continue;
}
+
s = getRedisConn(iter.first, iter.second.host, iter.second.port, iter.second.auth, iter.second.db_number);
if (!s.IsOK()) {
LOG(ERROR) << s.Msg();
continue;
}
+
while (true) {
auto getted_line_leng = pread(aof_fds_[iter.first], buffer, chunk_size, next_offsets_[iter.first]);
if (getted_line_leng <= 0) {
@@ -119,17 +121,20 @@ void RedisWriter::sync() {
}
break;
}
+
std::string con = std::string(buffer, getted_line_leng);
s = Util::SockSend(redis_fds_[iter.first], std::string(buffer, getted_line_leng));
if (!s.IsOK()) {
LOG(ERROR) << "ERR send data to redis err: " << s.Msg();
break;
}
+
auto line_state = Util::SockReadLine(redis_fds_[iter.first]);
if (!line_state) {
LOG(ERROR) << "read redis response err: " << s.Msg();
break;
}
+
std::string line = *line_state;
if (line.compare(0, 1, "-") == 0) {
// Ooops, something went wrong , sync process has been terminated, administrator should be notified
@@ -139,6 +144,7 @@ void RedisWriter::sync() {
Stop();
return;
}
+
s = updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng);
if (!s.IsOK()) {
LOG(ERROR) << "ERR updating next offset: " << s.Msg();
@@ -148,6 +154,7 @@ void RedisWriter::sync() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
+
delete[] buffer;
}
@@ -165,6 +172,7 @@ Status RedisWriter::getRedisConn(const std::string &ns, const std::string &host,
return s;
}
}
+
if (db_index != 0) {
auto s = selectDB(ns, db_index);
if (!s.IsOK()) {
@@ -189,6 +197,7 @@ Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) {
if (line.compare(0, 3, "+OK") != 0) {
return {Status::NotOK, "[kvrocks2redis] redis Auth failed: " + line};
}
+
return Status::OK();
}
@@ -200,11 +209,13 @@ Status RedisWriter::selectDB(const std::string &ns, int db_number) {
if (!s.IsOK()) {
return s.Prefixed("failed to send SELECT command to socket");
}
+
LOG(INFO) << "[kvrocks2redis] select db request was sent, waiting for response";
std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read select db response err"));
if (line.compare(0, 3, "+OK") != 0) {
return {Status::NotOK, "[kvrocks2redis] redis select db failed: " + line};
}
+
return Status::OK();
}
diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc
index e70c2b59..57d06d9b 100644
--- a/utils/kvrocks2redis/sync.cc
+++ b/utils/kvrocks2redis/sync.cc
@@ -155,7 +155,7 @@ Status Sync::incrementBatchLoop() {
while (!IsStopped()) {
if (evbuffer_read(evbuf, sock_fd_, -1) <= 0) {
evbuffer_free(evbuf);
- return {Status::NotOK, std::string("[kvrocks2redis] read increament batch err: ") + strerror(errno)};
+ return {Status::NotOK, std::string("[kvrocks2redis] read increment batch err: ") + strerror(errno)};
}
if (incr_state_ == IncrementBatchLoopState::Incr_batch_size) {
// Read bulk length
@@ -180,8 +180,12 @@ Status Sync::incrementBatchLoop() {
if (bulk_data_str != "ping") {
auto bat = rocksdb::WriteBatch(bulk_data_str);
int count = static_cast<int>(bat.Count());
- parser_->ParseWriteBatch(bulk_data_str);
- auto s = updateNextSeq(next_seq_ + count);
+ auto s = parser_->ParseWriteBatch(bulk_data_str);
+ if (!s.IsOK()) {
+ return s.Prefixed(fmt::format("failed to parse write batch '{}'", Util::StringToHex(bulk_data_str)));
+ }
+
+ s = updateNextSeq(next_seq_ + count);
if (!s.IsOK()) {
return s.Prefixed("failed to update next sequence");
}