You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by to...@apache.org on 2023/01/02 18:43:19 UTC

[incubator-kvrocks] branch unstable updated: Refactoring: check function return Status marked as [[nodiscard]] (#1214)

This is an automated email from the ASF dual-hosted git repository.

torwig pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 52b0a610 Refactoring: check function return Status marked as [[nodiscard]] (#1214)
52b0a610 is described below

commit 52b0a6102d83d006a0e1dfc55f9d054036cd6df4
Author: Yaroslav <to...@gmail.com>
AuthorDate: Mon Jan 2 20:43:14 2023 +0200

    Refactoring: check function return Status marked as [[nodiscard]] (#1214)
    
    * Refactoring: if the function returns Status marked as [[nodiscard]], check it
    
    * Add compiler option: treat unused-result warning as errors
---
 CMakeLists.txt                      |  3 +-
 src/cluster/cluster.cc              | 34 +++++++++-----
 src/cluster/cluster.h               |  2 +-
 src/cluster/replication.cc          | 38 +++++++++++----
 src/cluster/replication.h           |  2 +-
 src/commands/redis_cmd.cc           | 93 +++++++++++++++++++++++--------------
 src/config/config.cc                | 10 +++-
 src/main.cc                         |  5 +-
 src/server/server.cc                | 46 +++++++++++-------
 src/server/server.h                 |  6 +--
 src/server/worker.cc                | 22 +++++----
 src/storage/batch_extractor.cc      |  4 +-
 src/storage/scripting.cc            |  3 +-
 src/storage/storage.cc              | 40 ++++++++--------
 src/storage/storage.h               |  9 ++--
 tests/cppunit/config_test.cc        | 56 +++++++++++++---------
 tests/cppunit/cron_test.cc          |  5 +-
 utils/kvrocks2redis/main.cc         |  5 +-
 utils/kvrocks2redis/redis_writer.cc | 27 ++++++++---
 utils/kvrocks2redis/sync.cc         | 14 ++++--
 20 files changed, 268 insertions(+), 156 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index d4a89c26..a63f5ddc 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -177,8 +177,7 @@ add_library(kvrocks_objs OBJECT ${KVROCKS_SRCS})
 
 target_include_directories(kvrocks_objs PUBLIC src src/common ${PROJECT_BINARY_DIR})
 target_compile_features(kvrocks_objs PUBLIC cxx_std_17)
-# TODO: Add -Werror=unused-result to compile options
-target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer)
+target_compile_options(kvrocks_objs PUBLIC -Wall -Wpedantic -Wsign-compare -Wreturn-type -fno-omit-frame-pointer -Werror=unused-result)
 if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
     target_compile_options(kvrocks_objs PUBLIC -Wno-pedantic)
 elseif((CMAKE_CXX_COMPILER_ID STREQUAL "Clang") OR (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang"))
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 515123b6..f89906bc 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -90,7 +90,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
   }
 
   // Set replication relationship
-  if (myself_ != nullptr) SetMasterSlaveRepl();
+  if (myself_) return SetMasterSlaveRepl();
 
   return Status::OK();
 }
@@ -217,7 +217,12 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
   }
 
   // Set replication relationship
-  if (myself_ != nullptr) SetMasterSlaveRepl();
+  if (myself_) {
+    s = SetMasterSlaveRepl();
+    if (!s.IsOK()) {
+      return s.Prefixed("failed to set master-replica replication");
+    }
+  }
 
   // Clear data of migrated slots
   if (!migrated_slots_.empty()) {
@@ -235,26 +240,31 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
 }
 
 // Set replication relationship by cluster topology setting
-void Cluster::SetMasterSlaveRepl() {
-  if (!svr_) return;
+Status Cluster::SetMasterSlaveRepl() {
+  if (!svr_) return Status::OK();
 
-  if (myself_ == nullptr) return;
+  if (!myself_) return Status::OK();
 
   if (myself_->role_ == kClusterMaster) {
     // Master mode
-    svr_->RemoveMaster();
+    auto s = svr_->RemoveMaster();
+    if (!s.IsOK()) {
+      return s.Prefixed("failed to remove master");
+    }
     LOG(INFO) << "MASTER MODE enabled by cluster topology setting";
   } else if (nodes_.find(myself_->master_id_) != nodes_.end()) {
-    // Slave mode and master node is existing
+    // Replica mode and master node is existing
     std::shared_ptr<ClusterNode> master = nodes_[myself_->master_id_];
-    Status s = svr_->AddMaster(master->host_, master->port_, false);
-    if (s.IsOK()) {
-      LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting";
-    } else {
+    auto s = svr_->AddMaster(master->host_, master->port_, false);
+    if (!s.IsOK()) {
       LOG(WARNING) << "SLAVE OF " << master->host_ << ":" << master->port_
-                   << " enabled by cluster topology setting, encounter error: " << s.Msg();
+                   << " wasn't enabled by cluster topology setting, encounter error: " << s.Msg();
+      return s.Prefixed("failed to add master");
     }
+    LOG(INFO) << "SLAVE OF " << master->host_ << ":" << master->port_ << " enabled by cluster topology setting";
   }
+
+  return Status::OK();
 }
 
 bool Cluster::IsNotMaster() { return myself_ == nullptr || myself_->role_ != kClusterMaster || svr_->IsSlave(); }
diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h
index 7651a9a0..d947d8c3 100644
--- a/src/cluster/cluster.h
+++ b/src/cluster/cluster.h
@@ -90,7 +90,7 @@ class Cluster {
   bool IsWriteForbiddenSlot(int slot);
   Status CanExecByMySelf(const Redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
                          Redis::Connection *conn);
-  void SetMasterSlaveRepl();
+  Status SetMasterSlaveRepl();
   Status MigrateSlot(int slot, const std::string &dst_node_id);
   Status ImportSlot(Redis::Connection *conn, int slot, int state);
   std::string GetMyId() const { return myid_; }
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 01dca57f..415cd398 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -55,12 +55,16 @@ Status FeedSlaveThread::Start() {
       sigaddset(&mask, SIGHUP);
       sigaddset(&mask, SIGPIPE);
       pthread_sigmask(SIG_BLOCK, &mask, &omask);
-      Util::SockSend(conn_->GetFD(), "+OK\r\n");
+      auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n");
+      if (!s.IsOK()) {
+        LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
+        return;
+      }
       this->loop();
     });
   } catch (const std::system_error &e) {
     conn_ = nullptr;  // prevent connection was freed when failed to start the thread
-    return Status(Status::NotOK, e.what());
+    return {Status::NotOK, e.what()};
   }
   return Status::OK();
 }
@@ -544,7 +548,13 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
                          << Util::StringToHex(bulk_string);
               return CBState::RESTART;
             }
-            self->ParseWriteBatch(bulk_string);
+
+            s = self->ParseWriteBatch(bulk_string);
+            if (!s.IsOK()) {
+              LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << Util::StringToHex(bulk_string)
+                         << ": " << s.Msg();
+              return CBState::RESTART;
+            }
           }
           evbuffer_drain(input, self->incr_bulk_len_ + 2);
           self->incr_state_ = Incr_batch_size;
@@ -612,7 +622,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, v
         if (evbuffer_get_length(input) < self->fullsync_filesize_) {
           return CBState::AGAIN;
         }
-        meta = Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input);
+        auto s =
+            Engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, self->fullsync_meta_id_, input, &meta);
+        if (!s.IsOK()) {
+          LOG(ERROR) << "[replication] Failed to parse meta and save: " << s.Msg();
+          return CBState::AGAIN;
+        }
         target_dir = self->srv_->GetConfig()->backup_sync_dir;
       } else {
         // Master using new version
@@ -895,13 +910,13 @@ void ReplicationThread::EventTimerCB(int, int16_t, void *ctx) {
   }
 }
 
-rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) {
+Status ReplicationThread::ParseWriteBatch(const std::string &batch_string) {
   rocksdb::WriteBatch write_batch(batch_string);
   WriteBatchHandler write_batch_handler;
-  rocksdb::Status status;
 
-  status = write_batch.Iterate(&write_batch_handler);
-  if (!status.ok()) return status;
+  auto db_status = write_batch.Iterate(&write_batch_handler);
+  if (!db_status.ok()) return {Status::NotOK, "failed to iterate over write batch: " + db_status.ToString()};
+
   switch (write_batch_handler.Type()) {
     case kBatchTypePublish:
       srv_->PublishMessage(write_batch_handler.Key(), write_batch_handler.Value());
@@ -910,7 +925,10 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri
       if (write_batch_handler.Key() == Engine::kPropagateScriptCommand) {
         std::vector<std::string> tokens = Util::TokenizeRedisProtocol(write_batch_handler.Value());
         if (!tokens.empty()) {
-          srv_->ExecPropagatedCommand(tokens);
+          auto s = srv_->ExecPropagatedCommand(tokens);
+          if (!s.IsOK()) {
+            return s.Prefixed("failed to execute propagate command");
+          }
         }
       }
       break;
@@ -927,7 +945,7 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri
     case kBatchTypeNone:
       break;
   }
-  return rocksdb::Status::OK();
+  return Status::OK();
 }
 
 bool ReplicationThread::isRestoringError(const char *err) {
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index a96cf4c1..c02d6806 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -196,7 +196,7 @@ class ReplicationThread {
 
   static void EventTimerCB(int, int16_t, void *ctx);
 
-  rocksdb::Status ParseWriteBatch(const std::string &batch_string);
+  Status ParseWriteBatch(const std::string &batch_string);
 };
 
 /*
diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc
index 5fc20a7b..4f72c859 100644
--- a/src/commands/redis_cmd.cc
+++ b/src/commands/redis_cmd.cc
@@ -4143,32 +4143,36 @@ class CommandSlaveOf : public Commander {
       return Status::OK();
     }
 
-    Status s;
     if (host_.empty()) {
-      s = svr->RemoveMaster();
-      if (s.IsOK()) {
-        *output = Redis::SimpleString("OK");
-        LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
-        if (svr->GetConfig()->cluster_enabled) {
-          svr->slot_migrate_->SetMigrateStopFlag(false);
-          LOG(INFO) << "Change server role to master, restart migration task";
-        }
+      auto s = svr->RemoveMaster();
+      if (!s.IsOK()) {
+        return s.Prefixed("failed to remove master");
       }
-    } else {
-      s = svr->AddMaster(host_, port_, false);
-      if (s.IsOK()) {
-        *output = Redis::SimpleString("OK");
-        LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
-                     << "')";
-        if (svr->GetConfig()->cluster_enabled) {
-          svr->slot_migrate_->SetMigrateStopFlag(true);
-          LOG(INFO) << "Change server role to slave, stop migration task";
-        }
-      } else {
-        LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr()
-                   << "') encounter error: " << s.Msg();
+
+      *output = Redis::SimpleString("OK");
+      LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
+      if (svr->GetConfig()->cluster_enabled) {
+        svr->slot_migrate_->SetMigrateStopFlag(false);
+        LOG(INFO) << "Change server role to master, restart migration task";
+      }
+
+      return Status::OK();
+    }
+
+    auto s = svr->AddMaster(host_, port_, false);
+    if (s.IsOK()) {
+      *output = Redis::SimpleString("OK");
+      LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
+                   << "')";
+      if (svr->GetConfig()->cluster_enabled) {
+        svr->slot_migrate_->SetMigrateStopFlag(true);
+        LOG(INFO) << "Change server role to slave, stop migration task";
       }
+    } else {
+      LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ << " (user request from '" << conn->GetAddr()
+                 << "') encounter error: " << s.Msg();
     }
+
     return s;
   }
 
@@ -4249,19 +4253,26 @@ class CommandPSync : public Commander {
     // be taken over, so should never trigger any event in worker thread.
     conn->Detach();
     conn->EnableFlag(Redis::Connection::kSlave);
-    Util::SockSetBlocking(conn->GetFD(), 1);
+    auto s = Util::SockSetBlocking(conn->GetFD(), 1);
+    if (!s.IsOK()) {
+      conn->EnableFlag(Redis::Connection::kCloseAsync);
+      return s.Prefixed("failed to set blocking mode on socket");
+    }
 
     svr->stats_.IncrPSyncOKCounter();
-    Status s = svr->AddSlave(conn, next_repl_seq);
+    s = svr->AddSlave(conn, next_repl_seq);
     if (!s.IsOK()) {
       std::string err = "-ERR " + s.Msg() + "\r\n";
-      Util::SockSend(conn->GetFD(), err);
+      s = Util::SockSend(conn->GetFD(), err);
+      if (!s.IsOK()) {
+        LOG(WARNING) << "failed to send error message to the replica: " << s.Msg();
+      }
       conn->EnableFlag(Redis::Connection::kCloseAsync);
-      LOG(WARNING) << "Failed to add salve: " << conn->GetAddr() << " to start increment syncing";
+      LOG(WARNING) << "Failed to add replica: " << conn->GetAddr() << " to start incremental syncing";
     } else {
-      LOG(INFO) << "New slave: " << conn->GetAddr() << " was added, start increment syncing";
+      LOG(INFO) << "New replica: " << conn->GetAddr() << " was added, start incremental syncing";
     }
-    return Status::OK();
+    return s;
   }
 
  private:
@@ -4962,7 +4973,11 @@ class CommandFetchMeta : public Commander {
     int repl_fd = conn->GetFD();
     std::string ip = conn->GetIP();
 
-    Util::SockSetBlocking(repl_fd, 1);
+    auto s = Util::SockSetBlocking(repl_fd, 1);
+    if (!s.IsOK()) {
+      return s.Prefixed("failed to set blocking mode on socket");
+    }
+
     conn->NeedNotClose();
     conn->EnableFlag(Redis::Connection::kCloseAsync);
     svr->stats_.IncrFullSyncCounter();
@@ -4975,9 +4990,11 @@ class CommandFetchMeta : public Commander {
       std::string files;
       auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo(svr->storage_, &files);
       if (!s.IsOK()) {
-        Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
-        LOG(WARNING) << "[replication] Failed to get full data file info,"
-                     << " error: " << s.Msg();
+        s = Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
+        if (!s.IsOK()) {
+          LOG(WARNING) << "[replication] Failed to send error response: " << s.Msg();
+        }
+        LOG(WARNING) << "[replication] Failed to get full data file info: " << s.Msg();
         return;
       }
       // Send full data file info
@@ -5008,7 +5025,11 @@ class CommandFetchFile : public Commander {
     int repl_fd = conn->GetFD();
     std::string ip = conn->GetIP();
 
-    Util::SockSetBlocking(repl_fd, 1);
+    auto s = Util::SockSetBlocking(repl_fd, 1);
+    if (!s.IsOK()) {
+      return s.Prefixed("failed to set blocking mode on socket");
+    }
+
     conn->NeedNotClose();  // Feed-replica-file thread will close the replica fd
     conn->EnableFlag(Redis::Connection::kCloseAsync);
 
@@ -5351,7 +5372,11 @@ class CommandScript : public Commander {
 
     if (args_.size() == 2 && subcommand_ == "flush") {
       svr->ScriptFlush();
-      svr->Propagate(Engine::kPropagateScriptCommand, args_);
+      auto s = svr->Propagate(Engine::kPropagateScriptCommand, args_);
+      if (!s.IsOK()) {
+        LOG(ERROR) << "Failed to propagate script command: " << s.Msg();
+        return s;
+      }
       *output = Redis::SimpleString("OK");
     } else if (args_.size() >= 2 && subcommand_ == "exists") {
       *output = Redis::MultiLen(args_.size() - 2);
diff --git a/src/config/config.cc b/src/config/config.cc
index 8551bdda..8a8cb810 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -605,7 +605,10 @@ void Config::SetMaster(const std::string &host, uint32_t port) {
   master_port = port;
   auto iter = fields_.find("slaveof");
   if (iter != fields_.end()) {
-    iter->second->Set(master_host + " " + std::to_string(master_port));
+    auto s = iter->second->Set(master_host + " " + std::to_string(master_port));
+    if (!s.IsOK()) {
+      LOG(ERROR) << "Failed to set the value of 'slaveof' setting: " << s.Msg();
+    }
   }
 }
 
@@ -614,7 +617,10 @@ void Config::ClearMaster() {
   master_port = 0;
   auto iter = fields_.find("slaveof");
   if (iter != fields_.end()) {
-    iter->second->Set("no one");
+    auto s = iter->second->Set("no one");
+    if (!s.IsOK()) {
+      LOG(ERROR) << "Failed to clear the value of 'slaveof' setting: " << s.Msg();
+    }
   }
 }
 
diff --git a/src/main.cc b/src/main.cc
index 86d8e33c..8efd6a99 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -265,7 +265,10 @@ static Status createPidFile(const std::string &path) {
     return Status(Status::NotOK, strerror(errno));
   }
   std::string pid_str = std::to_string(getpid());
-  Util::Write(*fd, pid_str);
+  auto s = Util::Write(*fd, pid_str);
+  if (!s.IsOK()) {
+    return s.Prefixed("failed to write to PID-file");
+  }
   return Status::OK();
 }
 
diff --git a/src/server/server.cc b/src/server/server.cc
index 7b4a545e..2a622111 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -129,7 +129,10 @@ Status Server::Start() {
     if (!s.IsOK()) return s;
   } else {
     // Generate new replication id if not a replica
-    storage_->ShiftReplId();
+    auto s = storage_->ShiftReplId();
+    if (!s.IsOK()) {
+      return s.Prefixed("failed to shift replication id");
+    }
   }
 
   if (config_->cluster_enabled) {
@@ -255,9 +258,11 @@ Status Server::RemoveMaster() {
     master_host_.clear();
     master_port_ = 0;
     config_->ClearMaster();
-    if (replication_thread_) replication_thread_->Stop();
-    replication_thread_ = nullptr;
-    storage_->ShiftReplId();
+    if (replication_thread_) {
+      replication_thread_->Stop();
+      replication_thread_ = nullptr;
+    }
+    return storage_->ShiftReplId();
   }
   return Status::OK();
 }
@@ -524,40 +529,43 @@ void Server::UnblockOnStreams(const std::vector<std::string> &keys, Redis::Conne
   }
 }
 
-Status Server::WakeupBlockingConns(const std::string &key, size_t n_conns) {
+void Server::WakeupBlockingConns(const std::string &key, size_t n_conns) {
   std::lock_guard<std::mutex> guard(blocking_keys_mu_);
   auto iter = blocking_keys_.find(key);
   if (iter == blocking_keys_.end() || iter->second.empty()) {
-    return Status(Status::NotOK);
+    return;
   }
+
   while (n_conns-- && !iter->second.empty()) {
     auto conn_ctx = iter->second.front();
-    conn_ctx->owner->EnableWriteEvent(conn_ctx->fd);
+    auto s = conn_ctx->owner->EnableWriteEvent(conn_ctx->fd);
+    if (!s.IsOK()) {
+      LOG(ERROR) << "failed to enable write event on blocked client " << conn_ctx->fd << ": " << s.Msg();
+    }
     delConnContext(conn_ctx);
     iter->second.pop_front();
   }
-  return Status::OK();
 }
 
-Status Server::OnEntryAddedToStream(const std::string &ns, const std::string &key,
-                                    const Redis::StreamEntryID &entry_id) {
+void Server::OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id) {
   std::lock_guard<std::mutex> guard(blocking_keys_mu_);
   auto iter = blocked_stream_consumers_.find(key);
   if (iter == blocked_stream_consumers_.end() || iter->second.empty()) {
-    return Status(Status::NotOK);
+    return;
   }
 
   for (auto it = iter->second.begin(); it != iter->second.end();) {
     auto consumer = *it;
     if (consumer->ns == ns && entry_id > consumer->last_consumed_id) {
-      consumer->owner->EnableWriteEvent(consumer->fd);
+      auto s = consumer->owner->EnableWriteEvent(consumer->fd);
+      if (!s.IsOK()) {
+        LOG(ERROR) << "failed to enable write event on blocked stream consumer " << consumer->fd << ": " << s.Msg();
+      }
       it = iter->second.erase(it);
     } else {
       ++it;
     }
   }
-
-  return Status::OK();
 }
 
 void Server::delConnContext(ConnContext *c) {
@@ -659,7 +667,7 @@ void Server::cron() {
       // Purge backup if needed, it will cost much disk space if we keep backup and full sync
       // checkpoints at the same time
       if (config_->purge_backup_on_fullsync && (storage_->ExistCheckpoint() || storage_->ExistSyncCheckpoint())) {
-        AsyncPurgeOldBackups(0, 0);
+        s = AsyncPurgeOldBackups(0, 0);
       }
     }
 
@@ -1361,7 +1369,9 @@ void Server::KillClient(int64_t *killed, const std::string &addr, uint64_t id, u
   if (IsSlave() &&
       (type & kTypeMaster || (!addr.empty() && addr == master_host_ + ":" + std::to_string(master_port_)))) {
     // Stop replication thread and start a new one to replicate
-    AddMaster(master_host_, master_port_, true);
+    if (auto s = AddMaster(master_host_, master_port_, true); !s.IsOK()) {
+      LOG(ERROR) << "Failed to add master " << master_host_ << ":" << master_port_ << "with error: " << s.Msg();
+    }
     (*killed)++;
   }
 }
@@ -1402,9 +1412,9 @@ Status Server::ScriptGet(const std::string &sha, std::string *body) {
   return Status::OK();
 }
 
-void Server::ScriptSet(const std::string &sha, const std::string &body) {
+Status Server::ScriptSet(const std::string &sha, const std::string &body) {
   std::string funcname = Engine::kLuaFunctionPrefix + sha;
-  storage_->WriteToPropagateCF(funcname, body);
+  return storage_->WriteToPropagateCF(funcname, body);
 }
 
 void Server::ScriptReset() {
diff --git a/src/server/server.h b/src/server/server.h
index b3a6ab5a..b888b15c 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -152,8 +152,8 @@ class Server {
   void BlockOnStreams(const std::vector<std::string> &keys, const std::vector<Redis::StreamEntryID> &entry_ids,
                       Redis::Connection *conn);
   void UnblockOnStreams(const std::vector<std::string> &keys, Redis::Connection *conn);
-  Status WakeupBlockingConns(const std::string &key, size_t n_conns);
-  Status OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id);
+  void WakeupBlockingConns(const std::string &key, size_t n_conns);
+  void OnEntryAddedToStream(const std::string &ns, const std::string &key, const Redis::StreamEntryID &entry_id);
 
   std::string GetLastRandomKeyCursor();
   void SetLastRandomKeyCursor(const std::string &cursor);
@@ -194,7 +194,7 @@ class Server {
   lua_State *Lua() { return lua_; }
   Status ScriptExists(const std::string &sha);
   Status ScriptGet(const std::string &sha, std::string *body);
-  void ScriptSet(const std::string &sha, const std::string &body);
+  Status ScriptSet(const std::string &sha, const std::string &body);
   void ScriptReset();
   void ScriptFlush();
 
diff --git a/src/server/worker.cc b/src/server/worker.cc
index c7f82ebc..8121a187 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -159,10 +159,13 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock
   auto conn = new Redis::Connection(bev, worker);
   bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, Redis::Connection::OnEvent, conn);
   bufferevent_enable(bev, EV_READ);
-  Status status = worker->AddConnection(conn);
-  if (!status.IsOK()) {
-    std::string err_msg = Redis::Error("ERR " + status.Msg());
-    Util::SockSend(fd, err_msg);
+  s = worker->AddConnection(conn);
+  if (!s.IsOK()) {
+    std::string err_msg = Redis::Error("ERR " + s.Msg());
+    s = Util::SockSend(fd, err_msg);
+    if (!s.IsOK()) {
+      LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+    }
     conn->Close();
     return;
   }
@@ -187,10 +190,13 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f
   auto conn = new Redis::Connection(bev, worker);
   bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, Redis::Connection::OnEvent, conn);
   bufferevent_enable(bev, EV_READ);
-  Status status = worker->AddConnection(conn);
-  if (!status.IsOK()) {
-    std::string err_msg = Redis::Error("ERR " + status.Msg());
-    Util::SockSend(fd, err_msg);
+  auto s = worker->AddConnection(conn);
+  if (!s.IsOK()) {
+    std::string err_msg = Redis::Error("ERR " + s.Msg());
+    s = Util::SockSend(fd, err_msg);
+    if (!s.IsOK()) {
+      LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+    }
     conn->Close();
     return;
   }
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 8d893db9..ac57ffdc 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -38,7 +38,9 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
     }
   } else {
     // Redis type log data
-    log_data_.Decode(blob);
+    if (auto s = log_data_.Decode(blob); !s.IsOK()) {
+      LOG(WARNING) << "Failed to decode Redis type log: " << s.Msg();
+    }
   }
 }
 
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 1c3b3a71..82694855 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -926,8 +926,7 @@ Status createFunction(Server *srv, const std::string &body, std::string *sha, lu
     return Status(Status::NotOK, "Error running script (new function): " + errMsg + "\n");
   }
   // would store lua function into propagate column family and propagate those scripts to slaves
-  srv->ScriptSet(*sha, body);
-  return Status::OK();
+  return srv->ScriptSet(*sha, body);
 }
 
 }  // namespace Lua
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index efb7e469..ef020bfd 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -77,7 +77,7 @@ Storage::Storage(Config *config) : env_(rocksdb::Env::Default()), config_(config
 }
 
 Storage::~Storage() {
-  if (backup_ != nullptr) {
+  if (backup_) {
     DestroyBackup();
   }
   CloseDB();
@@ -248,7 +248,9 @@ Status Storage::Open(bool read_only) {
   size_t subkey_block_cache_size = config_->RocksDB.subkey_block_cache_size * MiB;
 
   rocksdb::Options options = InitOptions();
-  CreateColumnFamilies(options);
+  if (auto s = CreateColumnFamilies(options); !s.IsOK()) {
+    return s.Prefixed("failed to create column families");
+  }
 
   std::shared_ptr<rocksdb::Cache> shared_block_cache;
   if (config_->RocksDB.share_metadata_and_subkey_block_cache) {
@@ -372,10 +374,9 @@ Status Storage::CreateBackup() {
   return Status::OK();
 }
 
-Status Storage::DestroyBackup() {
+void Storage::DestroyBackup() {
   backup_->StopBackup();
   delete backup_;
-  return Status();
 }
 
 Status Storage::RestoreFromBackup() {
@@ -604,7 +605,7 @@ uint64_t Storage::GetTotalSize(const std::string &ns) {
   return total_size;
 }
 
-Status Storage::CheckDBSizeLimit() {
+void Storage::CheckDBSizeLimit() {
   bool reach_db_size_limit = false;
   if (config_->max_db_size == 0) {
     reach_db_size_limit = false;
@@ -612,8 +613,9 @@ Status Storage::CheckDBSizeLimit() {
     reach_db_size_limit = GetTotalSize() >= config_->max_db_size * GiB;
   }
   if (reach_db_size_limit_ == reach_db_size_limit) {
-    return Status::OK();
+    return;
   }
+
   reach_db_size_limit_ = reach_db_size_limit;
   if (reach_db_size_limit_) {
     LOG(WARNING) << "[storage] ENABLE db_size limit " << config_->max_db_size << " GB"
@@ -621,7 +623,6 @@ Status Storage::CheckDBSizeLimit() {
   } else {
     LOG(WARNING) << "[storage] DISABLE db_size limit, set kvrocks to read-write mode ";
   }
-  return Status::OK();
 }
 
 void Storage::SetIORateLimit(int64_t max_io_mb) {
@@ -645,12 +646,12 @@ Status Storage::WriteToPropagateCF(const std::string &key, const std::string &va
   return Status::OK();
 }
 
-bool Storage::ShiftReplId() {
+Status Storage::ShiftReplId() {
   const char *charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
   const int charset_len = static_cast<int>(strlen(charset));
 
   // Do nothing if don't enable rsid psync
-  if (!config_->use_rsid_psync) return true;
+  if (!config_->use_rsid_psync) return Status::OK();
 
   std::random_device rd;
   std::mt19937 gen(rd() + getpid());
@@ -663,8 +664,7 @@ bool Storage::ShiftReplId() {
   LOG(INFO) << "[replication] New replication id: " << replid_;
 
   // Write new replication id into db engine
-  WriteToPropagateCF(kReplicationIdKey, replid_);
-  return true;
+  return WriteToPropagateCF(kReplicationIdKey, replid_);
 }
 
 std::string Storage::GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq) {
@@ -834,10 +834,8 @@ int Storage::ReplDataManager::OpenDataFile(Storage *storage, const std::string &
   return rv;
 }
 
-Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(Storage *storage,
-                                                                              rocksdb::BackupID meta_id,
-                                                                              evbuffer *evbuf) {
-  Storage::ReplDataManager::MetaInfo meta;
+Status Storage::ReplDataManager::ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf,
+                                                  Storage::ReplDataManager::MetaInfo *meta) {
   auto meta_file = "meta/" + std::to_string(meta_id);
   DLOG(INFO) << "[meta] id: " << meta_id;
 
@@ -850,16 +848,16 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(St
   // timestamp;
   UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_LF);
   DLOG(INFO) << "[meta] timestamp: " << line.get();
-  meta.timestamp = std::strtoll(line.get(), nullptr, 10);
+  meta->timestamp = std::strtoll(line.get(), nullptr, 10);
   // sequence
   line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
   DLOG(INFO) << "[meta] seq:" << line.get();
-  meta.seq = std::strtoull(line.get(), nullptr, 10);
+  meta->seq = std::strtoull(line.get(), nullptr, 10);
   // optional metadata
   line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
   if (strncmp(line.get(), "metadata", 8) == 0) {
     DLOG(INFO) << "[meta] meta: " << line.get();
-    meta.meta_data = std::string(line.get(), line.length);
+    meta->meta_data = std::string(line.get(), line.length);
     line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
   }
   DLOG(INFO) << "[meta] file count: " << line.get();
@@ -877,10 +875,10 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(St
     while (*(cptr++) != ' ') {
     }
     auto crc32 = std::strtoul(cptr, nullptr, 10);
-    meta.files.emplace_back(filename, crc32);
+    meta->files.emplace_back(filename, crc32);
   }
-  SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
-  return meta;
+
+  return SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
 }
 
 Status MkdirRecursively(rocksdb::Env *env, const std::string &dir) {
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 271d1a60..ee28e6b6 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -79,7 +79,7 @@ class Storage {
   Status SetDBOption(const std::string &key, const std::string &value);
   Status CreateColumnFamilies(const rocksdb::Options &options);
   Status CreateBackup();
-  Status DestroyBackup();
+  void DestroyBackup();
   Status RestoreFromBackup();
   Status RestoreFromCheckpoint();
   Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
@@ -103,7 +103,7 @@ class Storage {
   LockManager *GetLockManager() { return &lock_mgr_; }
   void PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours);
   uint64_t GetTotalSize(const std::string &ns = kDefaultNamespace);
-  Status CheckDBSizeLimit();
+  void CheckDBSizeLimit();
   void SetIORateLimit(int64_t max_io_mb);
 
   std::unique_ptr<RWLock::ReadLock> ReadLockGuard();
@@ -139,7 +139,8 @@ class Storage {
       // [[filename, checksum]...]
       std::vector<std::pair<std::string, uint32_t>> files;
     };
-    static MetaInfo ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf);
+    static Status ParseMetaAndSave(Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf,
+                                   Storage::ReplDataManager::MetaInfo *meta);
     static std::unique_ptr<rocksdb::WritableFile> NewTmpFile(Storage *storage, const std::string &dir,
                                                              const std::string &repl_file);
     static Status SwapTmpFile(Storage *storage, const std::string &dir, const std::string &repl_file);
@@ -155,7 +156,7 @@ class Storage {
   void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = yes_or_no; }
   bool IsDBInRetryableIOError() { return db_in_retryable_io_error_; }
 
-  bool ShiftReplId();
+  Status ShiftReplId();
   std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq);
   std::string GetReplIdFromDbEngine();
 
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index f58b6c6e..7c7b934a 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -18,8 +18,6 @@
  *
  */
 
-#include "config.h"
-
 #include <gtest/gtest.h>
 
 #include <fstream>
@@ -35,7 +33,8 @@ TEST(Config, GetAndSet) {
   const char *path = "test.conf";
   Config config;
 
-  config.Load(CLIOptions(path));
+  auto s = config.Load(CLIOptions(path));
+  EXPECT_FALSE(s.IsOK());
   std::map<std::string, std::string> mutable_cases = {
       {"timeout", "1000"},
       {"maxclients", "2000"},
@@ -82,7 +81,7 @@ TEST(Config, GetAndSet) {
   };
   std::vector<std::string> values;
   for (const auto &iter : mutable_cases) {
-    auto s = config.Set(nullptr, iter.first, iter.second);
+    s = config.Set(nullptr, iter.first, iter.second);
     ASSERT_TRUE(s.IsOK());
     config.Get(iter.first, &values);
     ASSERT_TRUE(s.IsOK());
@@ -91,9 +90,10 @@ TEST(Config, GetAndSet) {
     EXPECT_EQ(values[1], iter.second);
   }
   ASSERT_TRUE(config.Rewrite().IsOK());
-  config.Load(CLIOptions(path));
+  s = config.Load(CLIOptions(path));
+  EXPECT_TRUE(s.IsOK());
   for (const auto &iter : mutable_cases) {
-    auto s = config.Set(nullptr, iter.first, iter.second);
+    s = config.Set(nullptr, iter.first, iter.second);
     ASSERT_TRUE(s.IsOK());
     config.Get(iter.first, &values);
     ASSERT_EQ(values.size(), 2);
@@ -125,7 +125,7 @@ TEST(Config, GetAndSet) {
       {"rocksdb.row_cache_size", "100"},
   };
   for (const auto &iter : immutable_cases) {
-    auto s = config.Set(nullptr, iter.first, iter.second);
+    s = config.Set(nullptr, iter.first, iter.second);
     ASSERT_FALSE(s.IsOK());
   }
 }
@@ -184,7 +184,8 @@ TEST(Namespace, Add) {
   unlink(path);
 
   Config config;
-  config.Load(CLIOptions(path));
+  auto s = config.Load(CLIOptions(path));
+  EXPECT_FALSE(s.IsOK());
   config.slot_id_encoded = false;
   EXPECT_TRUE(!config.AddNamespace("ns", "t0").IsOK());
   config.requirepass = "foobared";
@@ -196,15 +197,16 @@ TEST(Namespace, Add) {
   }
   for (size_t i = 0; i < namespaces.size(); i++) {
     std::string token;
-    config.GetNamespace(namespaces[i], &token);
+    s = config.GetNamespace(namespaces[i], &token);
+    EXPECT_TRUE(s.IsOK());
     EXPECT_EQ(token, tokens[i]);
   }
   for (size_t i = 0; i < namespaces.size(); i++) {
-    auto s = config.AddNamespace(namespaces[i], tokens[i]);
+    s = config.AddNamespace(namespaces[i], tokens[i]);
     EXPECT_FALSE(s.IsOK());
     EXPECT_EQ(s.Msg(), "the token has already exists");
   }
-  auto s = config.AddNamespace("n1", "t0");
+  s = config.AddNamespace("n1", "t0");
   EXPECT_FALSE(s.IsOK());
   EXPECT_EQ(s.Msg(), "the namespace has already exists");
 
@@ -219,14 +221,15 @@ TEST(Namespace, Set) {
   unlink(path);
 
   Config config;
-  config.Load(CLIOptions(path));
+  auto s = config.Load(CLIOptions(path));
+  EXPECT_FALSE(s.IsOK());
   config.slot_id_encoded = false;
   config.requirepass = "foobared";
   std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
   std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
   std::vector<std::string> new_tokens = {"nt1", "nt2'", "nt3", "nt4"};
   for (size_t i = 0; i < namespaces.size(); i++) {
-    auto s = config.SetNamespace(namespaces[i], tokens[i]);
+    s = config.SetNamespace(namespaces[i], tokens[i]);
     EXPECT_FALSE(s.IsOK());
     EXPECT_EQ(s.Msg(), "the namespace was not found");
   }
@@ -235,7 +238,8 @@ TEST(Namespace, Set) {
   }
   for (size_t i = 0; i < namespaces.size(); i++) {
     std::string token;
-    config.GetNamespace(namespaces[i], &token);
+    s = config.GetNamespace(namespaces[i], &token);
+    EXPECT_TRUE(s.IsOK());
     EXPECT_EQ(token, tokens[i]);
   }
   for (size_t i = 0; i < namespaces.size(); i++) {
@@ -243,7 +247,8 @@ TEST(Namespace, Set) {
   }
   for (size_t i = 0; i < namespaces.size(); i++) {
     std::string token;
-    config.GetNamespace(namespaces[i], &token);
+    s = config.GetNamespace(namespaces[i], &token);
+    EXPECT_TRUE(s.IsOK());
     EXPECT_EQ(token, new_tokens[i]);
   }
   unlink(path);
@@ -254,7 +259,8 @@ TEST(Namespace, Delete) {
   unlink(path);
 
   Config config;
-  config.Load(CLIOptions(path));
+  auto s = config.Load(CLIOptions(path));
+  EXPECT_FALSE(s.IsOK());
   config.slot_id_encoded = false;
   config.requirepass = "foobared";
   std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
@@ -264,13 +270,16 @@ TEST(Namespace, Delete) {
   }
   for (size_t i = 0; i < namespaces.size(); i++) {
     std::string token;
-    config.GetNamespace(namespaces[i], &token);
+    s = config.GetNamespace(namespaces[i], &token);
+    EXPECT_TRUE(s.IsOK());
     EXPECT_EQ(token, tokens[i]);
   }
   for (const auto &ns : namespaces) {
-    config.DelNamespace(ns);
+    s = config.DelNamespace(ns);
+    EXPECT_TRUE(s.IsOK());
     std::string token;
-    config.GetNamespace(ns, &token);
+    s = config.GetNamespace(ns, &token);
+    EXPECT_FALSE(s.IsOK());
     EXPECT_TRUE(token.empty());
   }
   unlink(path);
@@ -280,7 +289,8 @@ TEST(Namespace, RewriteNamespaces) {
   const char *path = "test.conf";
   unlink(path);
   Config config;
-  config.Load(CLIOptions(path));
+  auto s = config.Load(CLIOptions(path));
+  EXPECT_FALSE(s.IsOK());
   config.requirepass = "test";
   config.backup_dir = "test";
   config.slot_id_encoded = false;
@@ -293,10 +303,12 @@ TEST(Namespace, RewriteNamespaces) {
   EXPECT_TRUE(config.DelNamespace("to-be-deleted-ns").IsOK());
 
   Config new_config;
-  auto s = new_config.Load(CLIOptions(path));
+  s = new_config.Load(CLIOptions(path));
+  EXPECT_TRUE(s.IsOK());
   for (size_t i = 0; i < namespaces.size(); i++) {
     std::string token;
-    new_config.GetNamespace(namespaces[i], &token);
+    s = new_config.GetNamespace(namespaces[i], &token);
+    EXPECT_TRUE(s.IsOK());
     EXPECT_EQ(token, tokens[i]);
   }
 
diff --git a/tests/cppunit/cron_test.cc b/tests/cppunit/cron_test.cc
index 98c5d422..69d017b8 100644
--- a/tests/cppunit/cron_test.cc
+++ b/tests/cppunit/cron_test.cc
@@ -29,9 +29,10 @@ class CronTest : public testing::Test {
   explicit CronTest() {
     cron = std::make_unique<Cron>();
     std::vector<std::string> schedule{"*", "3", "*", "*", "*"};
-    cron->SetScheduleTime(schedule);
+    auto s = cron->SetScheduleTime(schedule);
+    EXPECT_TRUE(s.IsOK());
   }
-  ~CronTest() = default;
+  ~CronTest() override = default;
 
  protected:
   std::unique_ptr<Cron> cron;
diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc
index 03dcd357..41590108 100644
--- a/utils/kvrocks2redis/main.cc
+++ b/utils/kvrocks2redis/main.cc
@@ -88,7 +88,10 @@ static Status createPidFile(const std::string &path) {
     return Status(Status::NotOK, strerror(errno));
   }
   std::string pid_str = std::to_string(getpid());
-  Util::Write(fd, pid_str);
+  auto s = Util::Write(fd, pid_str);
+  if (!s.IsOK()) {
+    return s.Prefixed("failed to write to PID-file");
+  }
   close(fd);
   return Status::OK();
 }
diff --git a/utils/kvrocks2redis/redis_writer.cc b/utils/kvrocks2redis/redis_writer.cc
index 7b38545d..6d3c845b 100644
--- a/utils/kvrocks2redis/redis_writer.cc
+++ b/utils/kvrocks2redis/redis_writer.cc
@@ -67,7 +67,10 @@ Status RedisWriter::FlushDB(const std::string &ns) {
     return s;
   }
 
-  updateNextOffset(ns, 0);
+  s = updateNextOffset(ns, 0);
+  if (!s.IsOK()) {
+    return s;
+  }
 
   s = Write(ns, {Redis::Command2RESP({"FLUSHDB"})});
   if (!s.IsOK()) return s;
@@ -136,7 +139,11 @@ void RedisWriter::sync() {
           Stop();
           return;
         }
-        updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng);
+        s = updateNextOffset(iter.first, next_offsets_[iter.first] + getted_line_leng);
+        if (!s.IsOK()) {
+          LOG(ERROR) << "ERR updating next offset: " << s.Msg();
+          break;
+        }
       }
       std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
@@ -173,7 +180,11 @@ Status RedisWriter::getRedisConn(const std::string &ns, const std::string &host,
 
 Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) {
   const auto auth_len_str = std::to_string(auth.length());
-  Util::SockSend(redis_fds_[ns], "*2" CRLF "$4" CRLF "auth" CRLF "$" + auth_len_str + CRLF + auth + CRLF);
+  auto s = Util::SockSend(redis_fds_[ns], "*2" CRLF "$4" CRLF "auth" CRLF "$" + auth_len_str + CRLF + auth + CRLF);
+  if (!s.IsOK()) {
+    return s.Prefixed("[kvrocks2redis] failed to send AUTH command");
+  }
+
   std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read redis auth response err"));
   if (line.compare(0, 3, "+OK") != 0) {
     return {Status::NotOK, "[kvrocks2redis] redis Auth failed: " + line};
@@ -184,8 +195,11 @@ Status RedisWriter::authRedis(const std::string &ns, const std::string &auth) {
 Status RedisWriter::selectDB(const std::string &ns, int db_number) {
   const auto db_number_str = std::to_string(db_number);
   const auto db_number_str_len = std::to_string(db_number_str.length());
-  Util::SockSend(redis_fds_[ns],
-                 "*2" CRLF "$6" CRLF "select" CRLF "$" + db_number_str_len + CRLF + db_number_str + CRLF);
+  auto s = Util::SockSend(redis_fds_[ns],
+                          "*2" CRLF "$6" CRLF "select" CRLF "$" + db_number_str_len + CRLF + db_number_str + CRLF);
+  if (!s.IsOK()) {
+    return s.Prefixed("failed to send SELECT command to socket");
+  }
   LOG(INFO) << "[kvrocks2redis] select db request was sent, waiting for response";
   std::string line = GET_OR_RET(Util::SockReadLine(redis_fds_[ns]).Prefixed("read select db response err"));
   if (line.compare(0, 3, "+OK") != 0) {
@@ -224,8 +238,7 @@ Status RedisWriter::writeNextOffsetToFile(const std::string &ns, std::istream::o
     offset_string += " ";
   }
   offset_string += '\0';
-  Util::Pwrite(next_offset_fds_[ns], offset_string, 0);
-  return Status::OK();
+  return Util::Pwrite(next_offset_fds_[ns], offset_string, 0);
 }
 
 std::string RedisWriter::getNextOffsetFilePath(const std::string &ns) {
diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc
index 97352551..bda9d899 100644
--- a/utils/kvrocks2redis/sync.cc
+++ b/utils/kvrocks2redis/sync.cc
@@ -180,7 +180,10 @@ Status Sync::incrementBatchLoop() {
           auto bat = rocksdb::WriteBatch(bulk_data_str);
           int count = static_cast<int>(bat.Count());
           parser_->ParseWriteBatch(bulk_data_str);
-          updateNextSeq(next_seq_ + count);
+          auto s = updateNextSeq(next_seq_ + count);
+          if (!s.IsOK()) {
+            return s.Prefixed("failed to update next sequence");
+          }
         }
         evbuffer_drain(evbuf, incr_bulk_len_ + 2);
         incr_state_ = Incr_batch_size;
@@ -209,7 +212,11 @@ void Sync::parseKVFromLocalStorage() {
     LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: " << s.Msg();
     return;
   }
-  updateNextSeq(storage_->LatestSeq() + 1);
+
+  s = updateNextSeq(storage_->LatestSeq() + 1);
+  if (!s.IsOK()) {
+    LOG(ERROR) << "[kvrocks2redis] Failed to update next sequence: " << s.Msg();
+  }
 }
 
 Status Sync::updateNextSeq(rocksdb::SequenceNumber seq) {
@@ -242,6 +249,5 @@ Status Sync::writeNextSeqToFile(rocksdb::SequenceNumber seq) {
     seq_string += " ";
   }
   seq_string += '\0';
-  Util::Pwrite(next_seq_fd_, seq_string, 0);
-  return Status::OK();
+  return Util::Pwrite(next_seq_fd_, seq_string, 0);
 }