You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by to...@apache.org on 2023/01/02 18:43:19 UTC
[incubator-kvrocks] branch unstable updated: Refactoring: check function return Status marked as [[nodiscard]] (#1214)
This is an automated email from the ASF dual-hosted git repository.
torwig pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 52b0a610 Refactoring: check function return Status marked as [[nodiscard]] (#1214)
52b0a610 is described below
commit 52b0a6102d83d006a0e1dfc55f9d054036cd6df4
Author: Yaroslav <to...@gmail.com>
AuthorDate: Mon Jan 2 20:43:14 2023 +0200
Refactoring: check function return Status marked as [[nodiscard]] (#1214)
* Refactoring: if the function returns Status marked as [[nodiscard]], check it
* Add compiler option: treat unused-result warning as errors
---
CMakeLists.txt | 3 +-
src/cluster/cluster.cc | 34 +++++++++-----
src/cluster/cluster.h | 2 +-
src/cluster/replication.cc | 38 +++++++++++----
src/cluster/replication.h | 2 +-
src/commands/redis_cmd.cc | 93 +++++++++++++++++++++++--------------
src/config/config.cc | 10 +++-
src/main.cc | 5 +-
src/server/server.cc | 46 +++++++++++-------
src/server/server.h | 6 +--
src/server/worker.cc | 22 +++++----
src/storage/batch_extractor.cc | 4 +-
src/storage/scripting.cc | 3 +-
src/storage/storage.cc | 40 ++++++++--------
src/storage/storage.h | 9 ++--
tests/cppunit/config_test.cc | 56 +++++++++++++---------
tests/cppunit/cron_test.cc | 5 +-
utils/kvrocks2redis/main.cc | 5 +-
utils/kvrocks2redis/redis_writer.cc | 27 ++++++++---
utils/kvrocks2redis/sync.cc | 14 ++++--
20 files changed, 268 insertions(+), 156 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d4a89c26..a63f5ddc 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -177,8 +177,7 @@ add_library(kvrocks_objs OBJECT ${KVROCKS_SRCS})
target_include_directories(kvrocks_objs PUBLIC src src/common ${PROJECT_BINARY_DIR})
target_compile_features(kvrocks_objs PUBLIC cxx_std_17)
-# TODO: Add -Werror=unused-result to compile options
-target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer)
+target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer -Werror=unused-result)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
target_compile_options(kvrocks_objs PUBLIC -Wno-pedantic)
elseif((CMAKE_CXX_COMPILER_ID STREQUAL "Clang") OR (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang"))
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 515123b6..f89906bc 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -90,7 +90,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
}
// Set replication relationship
- if (myself_ != nullptr) SetMasterSlaveRepl();
+ if (myself_) return SetMasterSlaveRepl();
return Status::OK();
}
@@ -217,7 +217,12 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}
// Set replication relationship
- if (myself_ != nullptr) SetMasterSlaveRepl();
+ if (myself_) {
+ s = SetMasterSlaveRepl();
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to set master-replica replication");
+ }
+ }
// Clear data of migrated slots
if (!migrated_slots_.empty()) {
@@ -235,26 +240,31 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}
// Set replication relationship by cluster topology setting
-void Cluster::SetMasterSlaveRepl() {
- if (!svr_) return;
+Status Cluster::SetMasterSlaveRepl() {
+ if (!svr_) return Status::OK();
- if (myself_ == nullptr) return;
+ if (!myself_) return Status::OK();
if (myself_->role_ == kClusterMaster) {
// Master mode
- svr_->RemoveMaster();
+ auto s = svr_->RemoveMaster();
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to remove master");
+ }
LOG(INFO) << "MASTER MODE enabled by cluster topology setting";
} else if (nodes_.find(myself_->master_id_) != nodes_.end()) {
- // Slave mode and master node is existing
+ // Replica mode and master node is existing
std::shared_ptr<ClusterNode> master = nodes_[myself_->master_id_];
- Status s = svr_->AddMaster(master->host_, master->port_, false);
- if (s.IsOK()) {
- LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting";
- } else {
+ auto s = svr_->AddMaster(master->host_, master->port_, false);
+ if (!s.IsOK()) {
LOG(WARNING) << "SLAVE OF " << master->host_ << ":" << master->port_
- << " enabled by cluster topology setting, encounter error: " << s.Msg();
+ << " wasn't enabled by cluster topology setting, encounter error: " << s.Msg();
+ return s.Prefixed("failed to add master");
}
+ LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting";
}
+
+ return Status::OK();
}
bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role_ != kClusterMaster || svr_->IsSlave(); }
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 7651a9a0..d947d8c3 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -90,7 +90,7 @@ class Cluster {
bool IsWriteForbiddenSlot(int slot);
Status CanExecByMySelf(const Redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
Redis::Connection *conn);
- void SetMasterSlaveRepl();
+ Status SetMasterSlaveRepl();
Status MigrateSlot(int slot, const std::string &dst_node_id);
Status ImportSlot(Redis::Connection *conn, int slot, int state);
std::string GetMyId() const { return myid_; }
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 01dca57f..415cd398 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -55,12 +55,16 @@ Status FeedSlaveThread::Start() {
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
- Util::SockSend(conn_->GetFD(), "+OK\r\n");
+ auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n");
+ if (!s.IsOK()) {
+ LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
+ return;
+ }
this->loop();
});
} catch (const std::system_error &e) {
conn_ = nullptr; // prevent connection was freed when failed to start the thread
- return Status(Status::NotOK, e.what());
+ return {Status::NotOK, e.what()};
}
return Status::OK();
}
@@ -544,7 +548,13 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
<< Util::StringToHex(bulk_string);
return CBState::RESTART;
}
- self->ParseWriteBatch(bulk_string);
+
+ s = self->ParseWriteBatch(bulk_string);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << Util::StringToHex(bulk_string)
+ << ": " << s.Msg();
+ return CBState::RESTART;
+ }
}
evbuffer_drain(input, self->incr_bulk_len_ + 2);
self->incr_state_ = Incr_batch_size;
@@ -612,7 +622,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, v
if (evbuffer_get_length(input) < self->fullsync_filesize_) {
return CBState::AGAIN;
}
- meta = Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input);
+ auto s =
+ Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input, &meta);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "[replication] Failed to parse meta and save: " << s.Msg();
+ return CBState::AGAIN;
+ }
target_dir = self->srv_->GetConfig()->backup_sync_dir;
} else {
// Master using new version
@@ -895,13 +910,13 @@ void ReplicationThread::EventTimerCB(int, int16_t, void *ctx) {
}
}
-rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) {
+Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
WriteBatchHandler write_batch_handler;
- rocksdb::Status status;
- status = write_batch.Iterate(&write_batch_handler);
- if (!status.ok()) return status;
+ auto db_status = write_batch.Iterate(&write_batch_handler);
+ if (!db_status.ok()) return {Status::NotOK, "failed to iterate over write batch: " + db_status.ToString()};
+
switch (write_batch_handler.Type()) {
case kBatchTypePublish:
srv_->PublishMessage(write_batch_handler.Key(), write_batch_handler.Value());
@@ -910,7 +925,10 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri
if (write_batch_handler.Key() == Engine::kPropagateScriptCommand) {
std::vector<std::string> tokens = Util::TokenizeRedisProtocol(write_batch_handler.Value());
if (!tokens.empty()) {
- srv_->ExecPropagatedCommand(tokens);
+ auto s = srv_->ExecPropagatedCommand(tokens);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to execute propagate command");
+ }
}
}
break;
@@ -927,7 +945,7 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri
case kBatchTypeNone:
break;
}
- return rocksdb::Status::OK();
+ return Status::OK();
}
bool ReplicationThread::isRestoringError(const char *err) {
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index a96cf4c1..c02d6806 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -196,7 +196,7 @@ class ReplicationThread {
static void EventTimerCB(int, int16_t, void *ctx);
- rocksdb::Status ParseWriteBatch(const std::string &batch_string);
+ Status ParseWriteBatch(const std::string &batch_string);
};
/*
diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc
index 5fc20a7b..4f72c859 100644
--- a/src/commands/redis_cmd.cc
+++ b/src/commands/redis_cmd.cc
@@ -4143,32 +4143,36 @@ class CommandSlaveOf : public Commander {
return Status::OK();
}
- Status s;
if (host_.empty()) {
- s = svr->RemoveMaster();
- if (s.IsOK()) {
- *output = Redis::SimpleString("OK");
- LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
- if (svr->GetConfig()->cluster_enabled) {
- svr->slot_migrate_->SetMigrateStopFlag(false);
- LOG(INFO) << "Change server role to master, restart migration task";
- }
+ auto s = svr->RemoveMaster();
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to remove master");
}
- } else {
- s = svr->AddMaster(host_, port_, false);
- if (s.IsOK()) {
- *output = Redis::SimpleString("OK");
- LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
- << "')";
- if (svr->GetConfig()->cluster_enabled) {
- svr->slot_migrate_->SetMigrateStopFlag(true);
- LOG(INFO) << "Change server role to slave, stop migration task";
- }
- } else {
- LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr()
- << "') encounter error: " << s.Msg();
+
+ *output = Redis::SimpleString("OK");
+ LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
+ if (svr->GetConfig()->cluster_enabled) {
+ svr->slot_migrate_->SetMigrateStopFlag(false);
+ LOG(INFO) << "Change server role to master, restart migration task";
+ }
+
+ return Status::OK();
+ }
+
+ auto s = svr->AddMaster(host_, port_, false);
+ if (s.IsOK()) {
+ *output = Redis::SimpleString("OK");
+ LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
+ << "')";
+ if (svr->GetConfig()->cluster_enabled) {
+ svr->slot_migrate_->SetMigrateStopFlag(true);
+ LOG(INFO) << "Change server role to slave, stop migration task";
}
+ } else {
+ LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr()
+ << "') encounter error: " << s.Msg();
}
+
return s;
}
@@ -4249,19 +4253,26 @@ class CommandPSync : public Commander {
// be taken over, so should never trigger any event in worker thread.
conn->Detach();
conn->EnableFlag(Redis::Connection::kSlave);
- Util::SockSetBlocking(conn->GetFD(), 1);
+ auto s = Util::SockSetBlocking(conn->GetFD(), 1);
+ if (!s.IsOK()) {
+ conn->EnableFlag(Redis::Connection::kCloseAsync);
+ return s.Prefixed("failed to set blocking mode on socket");
+ }
svr->stats_.IncrPSyncOKCounter();
- Status s = svr->AddSlave(conn, next_repl_seq);
+ s = svr->AddSlave(conn, next_repl_seq);
if (!s.IsOK()) {
std::string err = "-ERR " + s.Msg() + "\r\n";
- Util::SockSend(conn->GetFD(), err);
+ s = Util::SockSend(conn->GetFD(), err);
+ if (!s.IsOK()) {
+ LOG(WARNING) << "failed to send error message to the replica: " << s.Msg();
+ }
conn->EnableFlag(Redis::Connection::kCloseAsync);
- LOG(WARNING) << "Failed to add salve: " << conn->GetAddr() << " to start increment syncing";
+ LOG(WARNING) << "Failed to add replica: " << conn->GetAddr() << " to start incremental syncing";
} else {
- LOG(INFO) << "New slave: " << conn->GetAddr() << " was added, start increment syncing";
+ LOG(INFO) << "New replica: " << conn->GetAddr() << " was added, start incremental syncing";
}
- return Status::OK();
+ return s;
}
private:
@@ -4962,7 +4973,11 @@ class CommandFetchMeta : public Commander {
int repl_fd = conn->GetFD();
std::string ip = conn->GetIP();
- Util::SockSetBlocking(repl_fd, 1);
+ auto s = Util::SockSetBlocking(repl_fd, 1);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to set blocking mode on socket");
+ }
+
conn->NeedNotClose();
conn->EnableFlag(Redis::Connection::kCloseAsync);
svr->stats_.IncrFullSyncCounter();
@@ -4975,9 +4990,11 @@ class CommandFetchMeta : public Commander {
std::string files;
auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo(svr->storage_, &files);
if (!s.IsOK()) {
- Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
- LOG(WARNING) << "[replication] Failed to get full data file info,"
- << " error: " << s.Msg();
+ s = Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
+ if (!s.IsOK()) {
+ LOG(WARNING) << "[replication] Failed to send error response: " << s.Msg();
+ }
+ LOG(WARNING) << "[replication] Failed to get full data file info: " << s.Msg();
return;
}
// Send full data file info
@@ -5008,7 +5025,11 @@ class CommandFetchFile : public Commander {
int repl_fd = conn->GetFD();
std::string ip = conn->GetIP();
- Util::SockSetBlocking(repl_fd, 1);
+ auto s = Util::SockSetBlocking(repl_fd, 1);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to set blocking mode on socket");
+ }
+
conn->NeedNotClose(); // Feed-replica-file thread will close the replica fd
conn->EnableFlag(Redis::Connection::kCloseAsync);
@@ -5351,7 +5372,11 @@ class CommandScript : public Commander {
if (args_.size() == 2 && subcommand_ == "flush") {
svr->ScriptFlush();
- svr->Propagate(Engine::kPropagateScriptCommand, args_);
+ auto s = svr->Propagate(Engine::kPropagateScriptCommand, args_);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "Failed to propagate script command: " << s.Msg();
+ return s;
+ }
*output = Redis::SimpleString("OK");
} else if (args_.size() >= 2 && subcommand_ == "exists") {
*output = Redis::MultiLen(args_.size() - 2);
diff --git a/src/config/config.cc b/src/config/config.cc
index 8551bdda..8a8cb810 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -605,7 +605,10 @@ void Config::SetMaster(const std::string &host, uint32_t port) {
master_port = port;
auto iter = fields_.find("slaveof");
if (iter != fields_.end()) {
- iter->second->Set(master_host + " " + std::to_string(master_port));
+ auto s = iter->second->Set(master_host + " " + std::to_string(master_port));
+ if (!s.IsOK()) {
+ LOG(ERROR) << "Failed to set the value of 'slaveof' setting: " << s.Msg();
+ }
}
}
@@ -614,7 +617,10 @@ void Config::ClearMaster() {
master_port = 0;
auto iter = fields_.find("slaveof");
if (iter != fields_.end()) {
- iter->second->Set("no one");
+ auto s = iter->second->Set("no one");
+ if (!s.IsOK()) {
+ LOG(ERROR) << "Failed to clear the value of 'slaveof' setting: " << s.Msg();
+ }
}
}
diff --git a/src/main.cc b/src/main.cc
index 86d8e33c..8efd6a99 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -265,7 +265,10 @@ static Status createPidFile(const std::string &path) {
return Status(Status::NotOK, strerror(errno));
}
std::string pid_str = std::to_string(getpid());
- Util::Write(*fd, pid_str);
+ auto s = Util::Write(*fd, pid_str);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to write to PID-file");
+ }
return Status::OK();
}
diff --git a/src/server/server.cc b/src/server/server.cc
index 7b4a545e..2a622111 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -129,7 +129,10 @@ Status Server::Start() {
if (!s.IsOK()) return s;
} else {
// Generate new replication id if not a replica
- storage_->ShiftReplId();
+ auto s = storage_->ShiftReplId();
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to shift replication id");
+ }
}
if (config_->cluster_enabled) {
@@ -255,9 +258,11 @@ Status Server::RemoveMaster() {
master_host_.clear();
master_port_ = 0;
config_->ClearMaster();
- if (replication_thread_) replication_thread_->Stop();
- replication_thread_ = nullptr;
- storage_->ShiftReplId();
+ if (replication_thread_) {
+ replication_thread_->Stop();
+ replication_thread_ = nullptr;
+ }
+ return storage_->ShiftReplId();
}
return Status::OK();
}
@@ -524,40 +529,43 @@ void Server::UnblockOnStreams(const std::vector<std::string> &keys, Redis::Conne
}
}
-Status Server::WakeupBlockingConns(const std::string &key, size_t n_conns) {
+void Server::WakeupBlockingConns(const std::string &key, size_t n_conns) {
std::lock_guard<std::mutex> guard(blocking_keys_mu_);
auto iter = blocking_keys_.find(key);
if (iter == blocking_keys_.end() || iter->second.empty()) {
- return Status(Status::NotOK);
+ return;
}
+
while (n_conns-- && !iter->second.empty()) {
auto conn_ctx = iter->second.front();
- conn_ctx->owner->EnableWriteEvent(conn_ctx->fd);
+ auto s = conn_ctx->owner->EnableWriteEvent(conn_ctx->fd);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "failed to enable write event on blocked client " << conn_ctx->fd << ": " << s.Msg();
+ }
delConnContext(conn_ctx);
iter->second.pop_front();
}
- return Status::OK();
}
-Status Server::OnEntryAddedToStream(const std::string &ns, const std::string &key,
- const Redis::StreamEntryID &entry_id) {
+void Server::OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id) {
std::lock_guard<std::mutex> guard(blocking_keys_mu_);
auto iter = blocked_stream_consumers_.find(key);
if (iter == blocked_stream_consumers_.end() || iter->second.empty()) {
- return Status(Status::NotOK);
+ return;
}
for (auto it = iter->second.begin(); it != iter->second.end();) {
auto consumer = *it;
if (consumer->ns == ns && entry_id > consumer->last_consumed_id) {
- consumer->owner->EnableWriteEvent(consumer->fd);
+ auto s = consumer->owner->EnableWriteEvent(consumer->fd);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "failed to enable write event on blocked stream consumer " << consumer->fd << ": " << s.Msg();
+ }
it = iter->second.erase(it);
} else {
++it;
}
}
-
- return Status::OK();
}
void Server::delConnContext(ConnContext *c) {
@@ -659,7 +667,7 @@ void Server::cron() {
// Purge backup if needed, it will cost much disk space if we keep backup and full sync
// checkpoints at the same time
if (config_->purge_backup_on_fullsync && (storage_->ExistCheckpoint() || storage_->ExistSyncCheckpoint())) {
- AsyncPurgeOldBackups(0, 0);
+ s = AsyncPurgeOldBackups(0, 0);
}
}
@@ -1361,7 +1369,9 @@ void Server::KillClient(int64_t *killed, const std::string &addr, uint64_t id, u
if (IsSlave() &&
(type & kTypeMaster || (!addr.empty() && addr == master_host_ + ":" + std::to_string(master_port_)))) {
// Stop replication thread and start a new one to replicate
- AddMaster(master_host_, master_port_, true);
+ if (auto s = AddMaster(master_host_, master_port_, true); !s.IsOK()) {
+ LOG(ERROR) << "Failed to add master " << master_host_ << ":" << master_port_ << "with error: " << s.Msg();
+ }
(*killed)++;
}
}
@@ -1402,9 +1412,9 @@ Status Server::ScriptGet(const std::string &sha, std::string *body) {
return Status::OK();
}
-void Server::ScriptSet(const std::string &sha, const std::string &body) {
+Status Server::ScriptSet(const std::string &sha, const std::string &body) {
std::string funcname = Engine::kLuaFunctionPrefix + sha;
- storage_->WriteToPropagateCF(funcname, body);
+ return storage_->WriteToPropagateCF(funcname, body);
}
void Server::ScriptReset() {
diff --git a/src/server/server.h b/src/server/server.h
index b3a6ab5a..b888b15c 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -152,8 +152,8 @@ class Server {
void BlockOnStreams(const std::vector<std::string> &keys, const std::vector<Redis::StreamEntryID> &entry_ids,
Redis::Connection *conn);
void UnblockOnStreams(const std::vector<std::string> &keys, Redis::Connection *conn);
- Status WakeupBlockingConns(const std::string &key, size_t n_conns);
- Status OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id);
+ void WakeupBlockingConns(const std::string &key, size_t n_conns);
+ void OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id);
std::string GetLastRandomKeyCursor();
void SetLastRandomKeyCursor(const std::string &cursor);
@@ -194,7 +194,7 @@ class Server {
lua_State *Lua() { return lua_; }
Status ScriptExists(const std::string &sha);
Status ScriptGet(const std::string &sha, std::string *body);
- void ScriptSet(const std::string &sha, const std::string &body);
+ Status ScriptSet(const std::string &sha, const std::string &body);
void ScriptReset();
void ScriptFlush();
diff --git a/src/server/worker.cc b/src/server/worker.cc
index c7f82ebc..8121a187 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -159,10 +159,13 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
auto conn = new Redis::Connection(bev, worker);
bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, Redis::Connection::OnEvent, conn);
bufferevent_enable(bev, EV_READ);
- Status status = worker->AddConnection(conn);
- if (!status.IsOK()) {
- std::string err_msg = Redis::Error("ERR " + status.Msg());
- Util::SockSend(fd, err_msg);
+ s = worker->AddConnection(conn);
+ if (!s.IsOK()) {
+ std::string err_msg = Redis::Error("ERR " + s.Msg());
+ s = Util::SockSend(fd, err_msg);
+ if (!s.IsOK()) {
+ LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+ }
conn->Close();
return;
}
@@ -187,10 +190,13 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f
auto conn = new Redis::Connection(bev, worker);
bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, Redis::Connection::OnEvent, conn);
bufferevent_enable(bev, EV_READ);
- Status status = worker->AddConnection(conn);
- if (!status.IsOK()) {
- std::string err_msg = Redis::Error("ERR " + status.Msg());
- Util::SockSend(fd, err_msg);
+ auto s = worker->AddConnection(conn);
+ if (!s.IsOK()) {
+ std::string err_msg = Redis::Error("ERR " + s.Msg());
+ s = Util::SockSend(fd, err_msg);
+ if (!s.IsOK()) {
+ LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+ }
conn->Close();
return;
}
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 8d893db9..ac57ffdc 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -38,7 +38,9 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
}
} else {
// Redis type log data
- log_data_.Decode(blob);
+ if (auto s = log_data_.Decode(blob); !s.IsOK()) {
+ LOG(WARNING) << "Failed to decode Redis type log: " << s.Msg();
+ }
}
}
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 1c3b3a71..82694855 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -926,8 +926,7 @@ Status createFunction(Server *srv, const std::string &body, std::string *sha, lu
return Status(Status::NotOK, "Error running script (new function): " + errMsg + "\n");
}
// would store lua function into propagate column family and propagate those scripts to slaves
- srv->ScriptSet(*sha, body);
- return Status::OK();
+ return srv->ScriptSet(*sha, body);
}
} // namespace Lua
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index efb7e469..ef020bfd 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -77,7 +77,7 @@ Storage::Storage(Config *config) : env_(rocksdb::Env::Default()), config_(config
}
Storage::~Storage() {
- if (backup_ != nullptr) {
+ if (backup_) {
DestroyBackup();
}
CloseDB();
@@ -248,7 +248,9 @@ Status Storage::Open(bool read_only) {
size_t subkey_block_cache_size = config_->RocksDB.subkey_block_cache_size * MiB;
rocksdb::Options options = InitOptions();
- CreateColumnFamilies(options);
+ if (auto s = CreateColumnFamilies(options); !s.IsOK()) {
+ return s.Prefixed("failed to create column families");
+ }
std::shared_ptr<rocksdb::Cache> shared_block_cache;
if (config_->RocksDB.share_metadata_and_subkey_block_cache) {
@@ -372,10 +374,9 @@ Status Storage::CreateBackup() {
return Status::OK();
}
-Status Storage::DestroyBackup() {
+void Storage::DestroyBackup() {
backup_->StopBackup();
delete backup_;
- return Status();
}
Status Storage::RestoreFromBackup() {
@@ -604,7 +605,7 @@ uint64_t Storage::GetTotalSize(const std::string &ns) {
return total_size;
}
-Status Storage::CheckDBSizeLimit() {
+void Storage::CheckDBSizeLimit() {
bool reach_db_size_limit = false;
if (config_->max_db_size == 0) {
reach_db_size_limit = false;
@@ -612,8 +613,9 @@ Status Storage::CheckDBSizeLimit() {
reach_db_size_limit = GetTotalSize() >= config_->max_db_size * GiB;
}
if (reach_db_size_limit_ == reach_db_size_limit) {
- return Status::OK();
+ return;
}
+
reach_db_size_limit_ = reach_db_size_limit;
if (reach_db_size_limit_) {
LOG(WARNING) << "[storage] ENABLE db_size limit " << config_->max_db_size << " GB"
@@ -621,7 +623,6 @@ Status Storage::CheckDBSizeLimit() {
} else {
LOG(WARNING) << "[storage] DISABLE db_size limit, set kvrocks to read-write mode ";
}
- return Status::OK();
}
void Storage::SetIORateLimit(int64_t max_io_mb) {
@@ -645,12 +646,12 @@ Status Storage::WriteToPropagateCF(const std::string &key, const std::string &va
return Status::OK();
}
-bool Storage::ShiftReplId() {
+Status Storage::ShiftReplId() {
const char *charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
const int charset_len = static_cast<int>(strlen(charset));
// Do nothing if don't enable rsid psync
- if (!config_->use_rsid_psync) return true;
+ if (!config_->use_rsid_psync) return Status::OK();
std::random_device rd;
std::mt19937 gen(rd() + getpid());
@@ -663,8 +664,7 @@ bool Storage::ShiftReplId() {
LOG(INFO) << "[replication] New replication id: " << replid_;
// Write new replication id into db engine
- WriteToPropagateCF(kReplicationIdKey, replid_);
- return true;
+ return WriteToPropagateCF(kReplicationIdKey, replid_);
}
std::string Storage::GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq) {
@@ -834,10 +834,8 @@ int Storage::ReplDataManager::OpenDataFile(Storage *storage, const std::string &
return rv;
}
-Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(Storage *storage,
- rocksdb::BackupID meta_id,
- evbuffer *evbuf) {
- Storage::ReplDataManager::MetaInfo meta;
+Status Storage::ReplDataManager::ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf,
+ Storage::ReplDataManager::MetaInfo *meta) {
auto meta_file = "meta/" + std::to_string(meta_id);
DLOG(INFO) << "[meta] id: " << meta_id;
@@ -850,16 +848,16 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(St
// timestamp;
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_LF);
DLOG(INFO) << "[meta] timestamp: " << line.get();
- meta.timestamp = std::strtoll(line.get(), nullptr, 10);
+ meta->timestamp = std::strtoll(line.get(), nullptr, 10);
// sequence
line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
DLOG(INFO) << "[meta] seq:" << line.get();
- meta.seq = std::strtoull(line.get(), nullptr, 10);
+ meta->seq = std::strtoull(line.get(), nullptr, 10);
// optional metadata
line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
if (strncmp(line.get(), "metadata", 8) == 0) {
DLOG(INFO) << "[meta] meta: " << line.get();
- meta.meta_data = std::string(line.get(), line.length);
+ meta->meta_data = std::string(line.get(), line.length);
line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
}
DLOG(INFO) << "[meta] file count: " << line.get();
@@ -877,10 +875,10 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(St
while (*(cptr++) != ' ') {
}
auto crc32 = std::strtoul(cptr, nullptr, 10);
- meta.files.emplace_back(filename, crc32);
+ meta->files.emplace_back(filename, crc32);
}
- SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
- return meta;
+
+ return SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
}
Status MkdirRecursively(rocksdb::Env *env, const std::string &dir) {
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 271d1a60..ee28e6b6 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -79,7 +79,7 @@ class Storage {
Status SetDBOption(const std::string &key, const std::string &value);
Status CreateColumnFamilies(const rocksdb::Options &options);
Status CreateBackup();
- Status DestroyBackup();
+ void DestroyBackup();
Status RestoreFromBackup();
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
@@ -103,7 +103,7 @@ class Storage {
LockManager *GetLockManager() { return &lock_mgr_; }
void PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours);
uint64_t GetTotalSize(const std::string &ns = kDefaultNamespace);
- Status CheckDBSizeLimit();
+ void CheckDBSizeLimit();
void SetIORateLimit(int64_t max_io_mb);
std::unique_ptr<RWLock::ReadLock> ReadLockGuard();
@@ -139,7 +139,8 @@ class Storage {
// [[filename, checksum]...]
std::vector<std::pair<std::string, uint32_t>> files;
};
- static MetaInfo ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf);
+ static Status ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf,
+ Storage::ReplDataManager::MetaInfo *meta);
static std::unique_ptr<rocksdb::WritableFile> NewTmpFile(Storage *storage, const std::string &dir,
const std::string &repl_file);
static Status SwapTmpFile(Storage *storage, const std::string &dir, const std::string &repl_file);
@@ -155,7 +156,7 @@ class Storage {
void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = yes_or_no; }
bool IsDBInRetryableIOError() { return db_in_retryable_io_error_; }
- bool ShiftReplId();
+ Status ShiftReplId();
std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq);
std::string GetReplIdFromDbEngine();
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index f58b6c6e..7c7b934a 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -18,8 +18,6 @@
*
*/
-#include "config.h"
-
#include <gtest/gtest.h>
#include <fstream>
@@ -35,7 +33,8 @@ TEST(Config, GetAndSet) {
const char *path = "test.conf";
Config config;
- config.Load(CLIOptions(path));
+ auto s = config.Load(CLIOptions(path));
+ EXPECT_FALSE(s.IsOK());
std::map<std::string, std::string> mutable_cases = {
{"timeout", "1000"},
{"maxclients", "2000"},
@@ -82,7 +81,7 @@ TEST(Config, GetAndSet) {
};
std::vector<std::string> values;
for (const auto &iter : mutable_cases) {
- auto s = config.Set(nullptr, iter.first, iter.second);
+ s = config.Set(nullptr, iter.first, iter.second);
ASSERT_TRUE(s.IsOK());
config.Get(iter.first, &values);
ASSERT_TRUE(s.IsOK());
@@ -91,9 +90,10 @@ TEST(Config, GetAndSet) {
EXPECT_EQ(values[1], iter.second);
}
ASSERT_TRUE(config.Rewrite().IsOK());
- config.Load(CLIOptions(path));
+ s = config.Load(CLIOptions(path));
+ EXPECT_TRUE(s.IsOK());
for (const auto &iter : mutable_cases) {
- auto s = config.Set(nullptr, iter.first, iter.second);
+ s = config.Set(nullptr, iter.first, iter.second);
ASSERT_TRUE(s.IsOK());
config.Get(iter.first, &values);
ASSERT_EQ(values.size(), 2);
@@ -125,7 +125,7 @@ TEST(Config, GetAndSet) {
{"rocksdb.row_cache_size", "100"},
};
for (const auto &iter : immutable_cases) {
- auto s = config.Set(nullptr, iter.first, iter.second);
+ s = config.Set(nullptr, iter.first, iter.second);
ASSERT_FALSE(s.IsOK());
}
}
@@ -184,7 +184,8 @@ TEST(Namespace, Add) {
unlink(path);
Config config;
- config.Load(CLIOptions(path));
+ auto s = config.Load(CLIOptions(path));
+ EXPECT_FALSE(s.IsOK());
config.slot_id_encoded = false;
EXPECT_TRUE(!config.AddNamespace("ns", "t0").IsOK());
config.requirepass = "foobared";
@@ -196,15 +197,16 @@ TEST(Namespace, Add) {
}
for (size_t i = 0; i < namespaces.size(); i++) {
std::string token;
- config.GetNamespace(namespaces[i], &token);
+ s = config.GetNamespace(namespaces[i], &token);
+ EXPECT_TRUE(s.IsOK());
EXPECT_EQ(token, tokens[i]);
}
for (size_t i = 0; i < namespaces.size(); i++) {
- auto s = config.AddNamespace(namespaces[i], tokens[i]);
+ s = config.AddNamespace(namespaces[i], tokens[i]);
EXPECT_FALSE(s.IsOK());
EXPECT_EQ(s.Msg(), "the token has already exists");
}
- auto s = config.AddNamespace("n1", "t0");
+ s = config.AddNamespace("n1", "t0");
EXPECT_FALSE(s.IsOK());
EXPECT_EQ(s.Msg(), "the namespace has already exists");
@@ -219,14 +221,15 @@ TEST(Namespace, Set) {
unlink(path);
Config config;
- config.Load(CLIOptions(path));
+ auto s = config.Load(CLIOptions(path));
+ EXPECT_FALSE(s.IsOK());
config.slot_id_encoded = false;
config.requirepass = "foobared";
std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
std::vector<std::string> new_tokens = {"nt1", "nt2'", "nt3", "nt4"};
for (size_t i = 0; i < namespaces.size(); i++) {
- auto s = config.SetNamespace(namespaces[i], tokens[i]);
+ s = config.SetNamespace(namespaces[i], tokens[i]);
EXPECT_FALSE(s.IsOK());
EXPECT_EQ(s.Msg(), "the namespace was not found");
}
@@ -235,7 +238,8 @@ TEST(Namespace, Set) {
}
for (size_t i = 0; i < namespaces.size(); i++) {
std::string token;
- config.GetNamespace(namespaces[i], &token);
+ s = config.GetNamespace(namespaces[i], &token);
+ EXPECT_TRUE(s.IsOK());
EXPECT_EQ(token, tokens[i]);
}
for (size_t i = 0; i < namespaces.size(); i++) {
@@ -243,7 +247,8 @@ TEST(Namespace, Set) {
}
for (size_t i = 0; i < namespaces.size(); i++) {
std::string token;
- config.GetNamespace(namespaces[i], &token);
+ s = config.GetNamespace(namespaces[i], &token);
+ EXPECT_TRUE(s.IsOK());
EXPECT_EQ(token, new_tokens[i]);
}
unlink(path);
@@ -254,7 +259,8 @@ TEST(Namespace, Delete) {
unlink(path);
Config config;
- config.Load(CLIOptions(path));
+ auto s = config.Load(CLIOptions(path));
+ EXPECT_FALSE(s.IsOK());
config.slot_id_encoded = false;
config.requirepass = "foobared";
std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
@@ -264,13 +270,16 @@ TEST(Namespace, Delete) {
}
for (size_t i = 0; i < namespaces.size(); i++) {
std::string token;
- config.GetNamespace(namespaces[i], &token);
+ s = config.GetNamespace(namespaces[i], &token);
+ EXPECT_TRUE(s.IsOK());
EXPECT_EQ(token, tokens[i]);
}
for (const auto &ns : namespaces) {
- config.DelNamespace(ns);
+ s = config.DelNamespace(ns);
+ EXPECT_TRUE(s.IsOK());
std::string token;
- config.GetNamespace(ns, &token);
+ s = config.GetNamespace(ns, &token);
+ EXPECT_FALSE(s.IsOK());
EXPECT_TRUE(token.empty());
}
unlink(path);
@@ -280,7 +289,8 @@ TEST(Namespace, RewriteNamespaces) {
const char *path = "test.conf";
unlink(path);
Config config;
- config.Load(CLIOptions(path));
+ auto s = config.Load(CLIOptions(path));
+ EXPECT_FALSE(s.IsOK());
config.requirepass = "test";
config.backup_dir = "test";
config.slot_id_encoded = false;
@@ -293,10 +303,12 @@ TEST(Namespace, RewriteNamespaces) {
EXPECT_TRUE(config.DelNamespace("to-be-deleted-ns").IsOK());
Config new_config;
- auto s = new_config.Load(CLIOptions(path));
+ s = new_config.Load(CLIOptions(path));
+ EXPECT_TRUE(s.IsOK());
for (size_t i = 0; i < namespaces.size(); i++) {
std::string token;
- new_config.GetNamespace(namespaces[i], &token);
+ s = new_config.GetNamespace(namespaces[i], &token);
+ EXPECT_TRUE(s.IsOK());
EXPECT_EQ(token, tokens[i]);
}
diff --git a/tests/cppunit/cron_test.cc b/tests/cppunit/cron_test.cc
index 98c5d422..69d017b8 100644
--- a/tests/cppunit/cron_test.cc
+++ b/tests/cppunit/cron_test.cc
@@ -29,9 +29,10 @@ class CronTest : public testing::Test {
explicit CronTest() {
cron = std::make_unique<Cron>();
std::vector<std::string> schedule{"*", "3", "*", "*", "*"};
- cron->SetScheduleTime(schedule);
+ auto s = cron->SetScheduleTime(schedule);
+ EXPECT_TRUE(s.IsOK());
}
- ~CronTest() = default;
+ ~CronTest() override = default;
protected:
std::unique_ptr<Cron> cron;
diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc
index 03dcd357..41590108 100644
--- a/utils/kvrocks2redis/main.cc
+++ b/utils/kvrocks2redis/main.cc
@@ -88,7 +88,10 @@ static Status createPidFile(const std::string &path) {
return Status(Status::NotOK, strerror(errno));
}
std::string pid_str = std::to_string(getpid());
- Util::Write(fd, pid_str);
+ auto s = Util::Write(fd, pid_str);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to write to PID-file");
+ }
close(fd);
return Status::OK();
}
diff --git a/utils/kvrocks2redis/redis_writer.cc b/utils/kvrocks2redis/redis_writer.cc
index 7b38545d..6d3c845b 100644
--- a/utils/kvrocks2redis/redis_writer.cc
+++ b/utils/kvrocks2redis/redis_writer.cc
@@ -67,7 +67,10 @@ Status RedisWriter::FlushDB(const std::string &ns) {
return s;
}
- updateNextOffset(ns, 0);
+ s = updateNextOffset(ns, 0);
+ if (!s.IsOK()) {
+ return s;
+ }
s = Write(ns, {Redis::Command2RESP({"FLUSHDB"})});
if (!s.IsOK()) return s;
@@ -136,7 +139,11 @@ void RedisWriter::sync() {
Stop();
return;
}
- updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng);
+ s = updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "ERR updating next offset: " << s.Msg();
+ break;
+ }
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
@@ -173,7 +180,11 @@ Status RedisWriter::getRedisConn(const std::string &ns, const std::string &host,
Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) {
const auto auth_len_str = std::to_string(auth.length());
- Util::SockSend(redis_fds_[ns], "*2" CRLF "$4" CRLF "auth" CRLF "$" + auth_len_str + CRLF + auth + CRLF);
+ auto s = Util::SockSend(redis_fds_[ns], "*2" CRLF "$4" CRLF "auth" CRLF "$" + auth_len_str + CRLF + auth + CRLF);
+ if (!s.IsOK()) {
+ return s.Prefixed("[kvrocks2redis] failed to send AUTH command");
+ }
+
std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read redis auth response err"));
if (line.compare(0, 3, "+OK") != 0) {
return {Status::NotOK, "[kvrocks2redis] redis Auth failed: " + line};
@@ -184,8 +195,11 @@ Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) {
Status RedisWriter::selectDB(const std::string &ns, int db_number) {
const auto db_number_str = std::to_string(db_number);
const auto db_number_str_len = std::to_string(db_number_str.length());
- Util::SockSend(redis_fds_[ns],
- "*2" CRLF "$6" CRLF "select" CRLF "$" + db_number_str_len + CRLF + db_number_str + CRLF);
+ auto s = Util::SockSend(redis_fds_[ns],
+ "*2" CRLF "$6" CRLF "select" CRLF "$" + db_number_str_len + CRLF + db_number_str + CRLF);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to send SELECT command to socket");
+ }
LOG(INFO) << "[kvrocks2redis] select db request was sent, waiting for response";
std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read select db response err"));
if (line.compare(0, 3, "+OK") != 0) {
@@ -224,8 +238,7 @@ Status RedisWriter::writeNextOffsetToFile(const std::string &ns, std::istream::o
offset_string += " ";
}
offset_string += '\0';
- Util::Pwrite(next_offset_fds_[ns], offset_string, 0);
- return Status::OK();
+ return Util::Pwrite(next_offset_fds_[ns], offset_string, 0);
}
std::string RedisWriter::getNextOffsetFilePath(const std::string &ns) {
diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc
index 97352551..bda9d899 100644
--- a/utils/kvrocks2redis/sync.cc
+++ b/utils/kvrocks2redis/sync.cc
@@ -180,7 +180,10 @@ Status Sync::incrementBatchLoop() {
auto bat = rocksdb::WriteBatch(bulk_data_str);
int count = static_cast<int>(bat.Count());
parser_->ParseWriteBatch(bulk_data_str);
- updateNextSeq(next_seq_ + count);
+ auto s = updateNextSeq(next_seq_ + count);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to update next sequence");
+ }
}
evbuffer_drain(evbuf, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;
@@ -209,7 +212,11 @@ void Sync::parseKVFromLocalStorage() {
LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: " << s.Msg();
return;
}
- updateNextSeq(storage_->LatestSeq() + 1);
+
+ s = updateNextSeq(storage_->LatestSeq() + 1);
+ if (!s.IsOK()) {
+ LOG(ERROR) << "[kvrocks2redis] Failed to update next sequence: " << s.Msg();
+ }
}
Status Sync::updateNextSeq(rocksdb::SequenceNumber seq) {
@@ -242,6 +249,5 @@ Status Sync::writeNextSeqToFile(rocksdb::SequenceNumber seq) {
seq_string += " ";
}
seq_string += '\0';
- Util::Pwrite(next_seq_fd_, seq_string, 0);
- return Status::OK();
+ return Util::Pwrite(next_seq_fd_, seq_string, 0);
}