You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by ti...@apache.org on 2022/06/26 01:26:54 UTC

[incubator-kvrocks] branch unstable updated: Add some RAII type for evbuffer to avoid manual free (#645)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new dd25baf  Add some RAII type for evbuffer to avoid manual free (#645)
dd25baf is described below

commit dd25baf3fbf145a17fcb63158f5166e78ee19e6d
Author: Twice <tw...@gmail.com>
AuthorDate: Sun Jun 26 09:26:50 2022 +0800

    Add some RAII type for evbuffer to avoid manual free (#645)
---
 .github/workflows/kvrocks.yaml |   2 +-
 cppcheck.sh                    |  10 +++-
 src/event_util.h               |  59 ++++++++++++++++++
 src/redis_request.cc           |  39 +++++-------
 src/replication.cc             | 132 +++++++++++++++--------------------------
 src/slot_migrate.cc            |  27 ++++-----
 src/storage.cc                 |  42 ++++++-------
 src/util.cc                    |  15 ++---
 8 files changed, 165 insertions(+), 161 deletions(-)

diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml
index 2ce5eae..6e367d8 100644
--- a/.github/workflows/kvrocks.yaml
+++ b/.github/workflows/kvrocks.yaml
@@ -39,7 +39,7 @@ jobs:
 
   lint:
     name: Lint and check code
-    runs-on: ubuntu-18.04
+    runs-on: ubuntu-latest
     steps:
       - name: Checkout Code Base
         uses: actions/checkout@v3
diff --git a/cppcheck.sh b/cppcheck.sh
index 439df36..752ec78 100755
--- a/cppcheck.sh
+++ b/cppcheck.sh
@@ -17,8 +17,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-CHECK_TYPES="warning,portability,information,missingInclude"
+# we should run cmake configuration to fetch deps if we want to enable missingInclude
+CHECK_TYPES="warning,portability,information"
 STANDARD=c++11
 ERROR_EXITCODE=1
 LANG=c++
-cppcheck --force --enable=${CHECK_TYPES} -U__GNUC__ -x ${LANG}  src --std=${STANDARD} --error-exitcode=${ERROR_EXITCODE}
+
+set -ex
+cppcheck --version
+cppcheck \
+    --force --enable=${CHECK_TYPES} -U__GNUC__ -x ${LANG}  src --std=${STANDARD} --error-exitcode=${ERROR_EXITCODE} \
+    --inline-suppr --suppress=noCopyConstructor:src/server.cc --suppress=noOperatorEq:src/server.cc -j$(nproc)
diff --git a/src/event_util.h b/src/event_util.h
new file mode 100644
index 0000000..81c06b5
--- /dev/null
+++ b/src/event_util.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <cstdlib>
+#include <utility>
+
+#include "event2/buffer.h"
+
+template <typename F, F *f> struct StaticFunction {
+  template <typename... Ts> auto operator()(Ts &&...args) const -> decltype(f(std::forward<Ts>(args)...)) {
+    return f(std::forward<Ts>(args)...);
+  }
+};
+
+using StaticFree = StaticFunction<decltype(std::free), std::free>;
+
+template <typename T>
+struct UniqueFreePtr : std::unique_ptr<T, StaticFree> {
+  using base_type = std::unique_ptr<T, StaticFree>;
+
+  using base_type::base_type;
+};
+
+struct UniqueEvbufReadln : UniqueFreePtr<char[]> {
+    // cppcheck-suppress uninitMemberVar
+    UniqueEvbufReadln(evbuffer* buffer, evbuffer_eol_style eol_style)
+      : UniqueFreePtr(evbuffer_readln(buffer, &length, eol_style)) {}
+
+    size_t length;
+};
+
+using StaticEvbufFree = StaticFunction<decltype(evbuffer_free), evbuffer_free>;
+
+struct UniqueEvbuf : std::unique_ptr<evbuffer, StaticEvbufFree> {
+  using base_type = std::unique_ptr<evbuffer, StaticEvbufFree>;
+
+  UniqueEvbuf() : base_type(evbuffer_new()) {}
+  explicit UniqueEvbuf(evbuffer *buffer) : base_type(buffer) {}
+};
diff --git a/src/redis_request.cc b/src/redis_request.cc
index 1fdd5e9..158f531 100644
--- a/src/redis_request.cc
+++ b/src/redis_request.cc
@@ -31,6 +31,7 @@
 #include "redis_connection.h"
 #include "server.h"
 #include "redis_slot.h"
+#include "event_util.h"
 
 namespace Redis {
 const size_t PROTO_INLINE_MAX_SIZE = 16 * 1024L;
@@ -38,74 +39,64 @@ const size_t PROTO_BULK_MAX_SIZE = 512 * 1024L * 1024L;
 const size_t PROTO_MULTI_MAX_SIZE = 1024 * 1024L;
 
 Status Request::Tokenize(evbuffer *input) {
-  char *line;
-  size_t len;
   size_t pipeline_size = 0;
   while (true) {
     switch (state_) {
-      case ArrayLen:
-        line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF_STRICT);
-        if (!line || len <= 0) {
+      case ArrayLen: {
+        UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
+        if (!line || line.length <= 0) {
           if (pipeline_size > 128) {
             LOG(INFO) << "Large pipeline detected: " << pipeline_size;
           }
           if (line) {
-            free(line);
             continue;
           }
           return Status::OK();
         }
         pipeline_size++;
-        svr_->stats_.IncrInbondBytes(len);
+        svr_->stats_.IncrInbondBytes(line.length);
         if (line[0] == '*') {
           try {
-            multi_bulk_len_ = std::stoll(std::string(line + 1, len-1));
+            multi_bulk_len_ = std::stoll(std::string(line.get() + 1, line.length - 1));
           } catch (std::exception &e) {
-            free(line);
             return Status(Status::NotOK, "Protocol error: invalid multibulk length");
           }
           if (multi_bulk_len_ <= 0) {
               multi_bulk_len_ = 0;
-              free(line);
               continue;
           }
           if (multi_bulk_len_ > (int64_t)PROTO_MULTI_MAX_SIZE) {
-            free(line);
             return Status(Status::NotOK, "Protocol error: invalid multibulk length");
           }
           state_ = BulkLen;
         } else {
-          if (len > PROTO_INLINE_MAX_SIZE) {
-            free(line);
+          if (line.length > PROTO_INLINE_MAX_SIZE) {
             return Status(Status::NotOK, "Protocol error: invalid bulk length");
           }
-          tokens_ = Util::Split(std::string(line, len), " \t");
+          tokens_ = Util::Split(std::string(line.get(), line.length), " \t");
           commands_.emplace_back(std::move(tokens_));
           state_ = ArrayLen;
         }
-        free(line);
         break;
-      case BulkLen:
-        line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF_STRICT);
-        if (!line || len <= 0) return Status::OK();
-        svr_->stats_.IncrInbondBytes(len);
+      }
+      case BulkLen: {
+        UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
+        if (!line || line.length <= 0) return Status::OK();
+        svr_->stats_.IncrInbondBytes(line.length);
         if (line[0] != '$') {
-          free(line);
           return Status(Status::NotOK, "Protocol error: expected '$'");
         }
         try {
-          bulk_len_ = std::stoull(std::string(line + 1, len-1));
+          bulk_len_ = std::stoull(std::string(line.get() + 1, line.length - 1));
         } catch (std::exception &e) {
-          free(line);
           return Status(Status::NotOK, "Protocol error: invalid bulk length");
         }
         if (bulk_len_ > PROTO_BULK_MAX_SIZE) {
-          free(line);
           return Status(Status::NotOK, "Protocol error: invalid bulk length");
         }
-        free(line);
         state_ = BulkData;
         break;
+      }
       case BulkData:
         if (evbuffer_get_length(input) < bulk_len_ + 2) return Status::OK();
         char *data = reinterpret_cast<char *>(evbuffer_pullup(input, bulk_len_ + 2));
diff --git a/src/replication.cc b/src/replication.cc
index d08f77c..76bd4f7 100644
--- a/src/replication.cc
+++ b/src/replication.cc
@@ -37,6 +37,7 @@
 #include "util.h"
 #include "status.h"
 #include "server.h"
+#include "event_util.h"
 
 FeedSlaveThread::~FeedSlaveThread() {
   delete conn_;
@@ -395,18 +396,14 @@ ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev,
 
 ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev,
                                                          void *ctx) {
-  char *line;
-  size_t line_len;
   auto input = bufferevent_get_input(bev);
-  line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+  UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
-  if (strncmp(line, "+OK", 3) != 0) {
+  if (strncmp(line.get(), "+OK", 3) != 0) {
     // Auth failed
-    LOG(ERROR) << "[replication] Auth failed: " << line;
-    free(line);
+    LOG(ERROR) << "[replication] Auth failed: " << line.get();
     return CBState::RESTART;
   }
-  free(line);
   LOG(INFO) << "[replication] Auth response was received, continue...";
   return CBState::NEXT;
 }
@@ -422,31 +419,26 @@ ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(
 
 ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(
     bufferevent *bev, void *ctx) {
-  char *line;
-  size_t line_len;
   auto input = bufferevent_get_input(bev);
-  line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+  UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
 
   if (line[0] == '-') {
-    if (isRestoringError(line)) {
+    if (isRestoringError(line.get())) {
       LOG(WARNING) << "The master was restoring the db, retry later";
     } else {
-      LOG(ERROR) << "Failed to get the db name, " << line;
+      LOG(ERROR) << "Failed to get the db name, " << line.get();
     }
-    free(line);
     return CBState::RESTART;
   }
   auto self = static_cast<ReplicationThread *>(ctx);
   std::string db_name = self->storage_->GetName();
-  if (line_len == db_name.size() && !strncmp(line, db_name.data(), line_len)) {
+  if (line.length == db_name.size() && !strncmp(line.get(), db_name.data(), line.length)) {
     // DB name match, we should continue to next step: TryPsync
-    free(line);
     LOG(INFO) << "[replication] DB name is valid, continue...";
     return CBState::NEXT;
   }
-  LOG(ERROR) << "[replication] Mismatched the db name, local: " << db_name << ", remote: " << line;
-  free(line);
+  LOG(ERROR) << "[replication] Mismatched the db name, local: " << db_name << ", remote: " << line.get();
   return CBState::RESTART;
 }
 
@@ -462,24 +454,19 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(
 
 ReplicationThread::CBState ReplicationThread::replConfReadCB(
     bufferevent *bev, void *ctx) {
-  char *line;
-  size_t line_len;
   auto input = bufferevent_get_input(bev);
-  line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+  UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
 
-  if (line[0] == '-' && isRestoringError(line)) {
-    free(line);
+  if (line[0] == '-' && isRestoringError(line.get())) {
     LOG(WARNING) << "The master was restoring the db, retry later";
     return CBState::RESTART;
   }
-  if (strncmp(line, "+OK", 3) != 0) {
-    LOG(WARNING) << "[replication] Failed to replconf: " << line+1;
-    free(line);
+  if (strncmp(line.get(), "+OK", 3) != 0) {
+    LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
     //  backward compatible with old version that doesn't support replconf cmd
     return CBState::NEXT;
   } else {
-    free(line);
     LOG(INFO) << "[replication] replconf is ok, start psync";
     return CBState::NEXT;
   }
@@ -526,39 +513,33 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(
 
 ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev,
                                                              void *ctx) {
-  char *line;
-  size_t line_len;
   auto self = static_cast<ReplicationThread *>(ctx);
   auto input = bufferevent_get_input(bev);
-  line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+  UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
 
-  if (line[0] == '-' && isRestoringError(line)) {
-    free(line);
+  if (line[0] == '-' && isRestoringError(line.get())) {
     LOG(WARNING) << "The master was restoring the db, retry later";
     return CBState::RESTART;
   }
 
-  if (line[0] == '-' && isWrongPsyncNum(line)) {
+  if (line[0] == '-' && isWrongPsyncNum(line.get())) {
     self->next_try_old_psync_ = true;
-    free(line);
     LOG(WARNING) << "The old version master, can't handle new PSYNC, "
                   << "try old PSYNC again";
     // Retry previous state, i.e. send PSYNC again
     return CBState::PREV;
   }
 
-  if (strncmp(line, "+OK", 3) != 0) {
+  if (strncmp(line.get(), "+OK", 3) != 0) {
     // PSYNC isn't OK, we should use FullSync
     // Switch to fullsync state machine
     self->fullsync_steps_.Start();
-    LOG(INFO) << "[replication] Failed to psync, error: " << line
+    LOG(INFO) << "[replication] Failed to psync, error: " << line.get()
               << ", switch to fullsync";
-    free(line);
     return CBState::QUIT;
   } else {
     // PSYNC is OK, use IncrementBatchLoop
-    free(line);
     LOG(INFO) << "[replication] PSync is ok, start increment batch loop";
     return CBState::NEXT;
   }
@@ -566,26 +547,24 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev,
 
 ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(
     bufferevent *bev, void *ctx) {
-  char *line = nullptr;
-  size_t line_len = 0;
   char *bulk_data = nullptr;
   auto self = static_cast<ReplicationThread *>(ctx);
   self->repl_state_ = kReplConnected;
   auto input = bufferevent_get_input(bev);
   while (true) {
     switch (self->incr_state_) {
-      case Incr_batch_size:
+      case Incr_batch_size: {
         // Read bulk length
-        line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+        UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
         if (!line) return CBState::AGAIN;
-        self->incr_bulk_len_ = line_len > 0 ? std::strtoull(line + 1, nullptr, 10) : 0;
-        free(line);
+        self->incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, nullptr, 10) : 0;
         if (self->incr_bulk_len_ == 0) {
           LOG(ERROR) << "[replication] Invalid increment data size";
           return CBState::RESTART;
         }
         self->incr_state_ = Incr_batch_data;
         break;
+      }
       case Incr_batch_data:
         // Read bulk data (batch data)
         if (self->incr_bulk_len_+2 <= evbuffer_get_length(input)) {  // We got enough data
@@ -623,50 +602,46 @@ ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(
 
 ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
                                                              void *ctx) {
-  char *line;
-  size_t line_len;
   auto self = static_cast<ReplicationThread *>(ctx);
   auto input = bufferevent_get_input(bev);
   switch (self->fullsync_state_) {
-    case kFetchMetaID:
+    case kFetchMetaID: {
       // New version master only sends meta file content
       if (!self->srv_->GetConfig()->master_use_repl_port) {
         self->fullsync_state_ = kFetchMetaContent;
         return CBState::AGAIN;
       }
-      line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+      UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
       if (!line) return CBState::AGAIN;
       if (line[0] == '-') {
-        LOG(ERROR) << "[replication] Failed to fetch meta id: " << line;
-        free(line);
+        LOG(ERROR) << "[replication] Failed to fetch meta id: " << line.get();
         return CBState::RESTART;
       }
       self->fullsync_meta_id_ = static_cast<rocksdb::BackupID>(
-          line_len > 0 ? std::strtoul(line, nullptr, 10) : 0);
-      free(line);
+          line.length > 0 ? std::strtoul(line.get(), nullptr, 10) : 0);
       if (self->fullsync_meta_id_ == 0) {
         LOG(ERROR) << "[replication] Invalid meta id received";
         return CBState::RESTART;
       }
       self->fullsync_state_ = kFetchMetaSize;
       LOG(INFO) << "[replication] Succeed fetching meta id: " << self->fullsync_meta_id_;
-    case kFetchMetaSize:
-      line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+    }
+    case kFetchMetaSize: {
+      UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
       if (!line) return CBState::AGAIN;
       if (line[0] == '-') {
-        LOG(ERROR) << "[replication] Failed to fetch meta size: " << line;
-        free(line);
+        LOG(ERROR) << "[replication] Failed to fetch meta size: " << line.get();
         return CBState::RESTART;
       }
-      self->fullsync_filesize_ = line_len > 0 ? std::strtoull(line, nullptr, 10) : 0;
-      free(line);
+      self->fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
       if (self->fullsync_filesize_ == 0) {
         LOG(ERROR) << "[replication] Invalid meta file size received";
         return CBState::RESTART;
       }
       self->fullsync_state_ = kFetchMetaContent;
       LOG(INFO) << "[replication] Succeed fetching meta size: " << self->fullsync_filesize_;
-    case kFetchMetaContent:
+    }
+    case kFetchMetaContent: {
       std::string target_dir;
       Engine::Storage::ReplDataManager::MetaInfo meta;
       // Master using old version
@@ -679,18 +654,16 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
         target_dir = self->srv_->GetConfig()->backup_sync_dir;
       } else {
         // Master using new version
-        line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+        UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
         if (!line) return CBState::AGAIN;
         if (line[0] == '-') {
-          LOG(ERROR) << "[replication] Failed to fetch meta info: " << line;
-          free(line);
+          LOG(ERROR) << "[replication] Failed to fetch meta info: " << line.get();
           return CBState::RESTART;
         }
-        std::vector<std::string> need_files = Util::Split(std::string(line), ",");
+        std::vector<std::string> need_files = Util::Split(std::string(line.get()), ",");
         for (auto f : need_files) {
           meta.files.emplace_back(f, 0);
         }
-        free(line);
 
         target_dir = self->srv_->GetConfig()->sync_checkpoint_dir;
         // Clean invalid files of checkpoint, "CURRENT" file must be invalid
@@ -750,6 +723,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
       // Switch to psync state machine again
       self->psync_steps_.Start();
       return CBState::QUIT;
+    }
   }
 
   LOG(ERROR) << "Should not arrive here";
@@ -840,31 +814,24 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
 }
 
 Status ReplicationThread::sendAuth(int sock_fd) {
-  size_t line_len;
-
   // Send auth when needed
   std::string auth = srv_->GetConfig()->masterauth;
   if (!auth.empty()) {
-    evbuffer *evbuf = evbuffer_new();
+    UniqueEvbuf evbuf;
     const auto auth_command = Redis::MultiBulkString({"AUTH", auth});
     auto s = Util::SockSend(sock_fd, auth_command);
     if (!s.IsOK()) return Status(Status::NotOK, "send auth command err:"+s.Msg());
     while (true) {
-      if (evbuffer_read(evbuf, sock_fd, -1) <= 0) {
-        evbuffer_free(evbuf);
+      if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
         return Status(Status::NotOK, std::string("read auth response err: ")+strerror(errno));
       }
-      char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+      UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
       if (!line) continue;
-      if (strncmp(line, "+OK", 3) != 0) {
-        free(line);
-        evbuffer_free(evbuf);
+      if (strncmp(line.get(), "+OK", 3) != 0) {
         return Status(Status::NotOK, "auth got invalid response");
       }
-      free(line);
       break;
     }
-    evbuffer_free(evbuf);
   }
   return Status::OK();
 }
@@ -872,24 +839,22 @@ Status ReplicationThread::sendAuth(int sock_fd) {
 Status ReplicationThread::fetchFile(int sock_fd,  evbuffer *evbuf,
                           const std::string &dir, std::string file,
                           uint32_t crc, fetch_file_callback fn) {
-  size_t line_len, file_size;
+  size_t file_size;
 
   // Read file size line
   while (true) {
-    char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+    UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
     if (!line) {
       if (evbuffer_read(evbuf, sock_fd, -1) <= 0) {
         return Status(Status::NotOK, std::string("read size: ")+strerror(errno));
       }
       continue;
     }
-    if (*line == '-') {
-      std::string msg(line);
-      free(line);
+    if (line[0] == '-') {
+      std::string msg(line.get());
       return Status(Status::NotOK, msg);
     }
-    file_size = line_len > 0 ? std::strtoull(line, nullptr, 10) : 0;
-    free(line);
+    file_size = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
     break;
   }
 
@@ -947,10 +912,10 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir,
   auto s = Util::SockSend(sock_fd, fetch_command);
   if (!s.IsOK()) return Status(Status::NotOK, "send fetch file command: "+s.Msg());
 
-  evbuffer *evbuf = evbuffer_new();
+  UniqueEvbuf evbuf;
   for (unsigned i = 0; i < files.size(); i++) {
     DLOG(INFO) << "[fetch] Start to fetch file " << files[i];
-    s = fetchFile(sock_fd, evbuf, dir, files[i], crcs[i], fn);
+    s = fetchFile(sock_fd, evbuf.get(), dir, files[i], crcs[i], fn);
     if (!s.IsOK()) {
       s = Status(Status::NotOK, "fetch file err: " + s.Msg());
       LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: " << s.Msg();
@@ -963,7 +928,6 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir,
       sleep(srv_->GetConfig()->fullsync_recv_file_delay);
     }
   }
-  evbuffer_free(evbuf);
   return s;
 }
 
diff --git a/src/slot_migrate.cc b/src/slot_migrate.cc
index 9b22eda..9273493 100644
--- a/src/slot_migrate.cc
+++ b/src/slot_migrate.cc
@@ -19,8 +19,8 @@
  */
 
 #include "slot_migrate.h"
-
 #include "batch_extractor.h"
+#include "event_util.h"
 
 static std::map<RedisType, std::string> type_to_cmd = {
   {kRedisString, "set"},
@@ -462,15 +462,13 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
   setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
 
   // Start checking response
-  size_t line_len = 0, bulk_len = 0;
-  char *line = nullptr;
+  size_t bulk_len = 0;
   int cnt = 0;
   stat_ = ArrayLen;
-  evbuffer *evbuf = evbuffer_new();
+  UniqueEvbuf evbuf;
   while (true) {
     // Read response data from socket buffer to event buffer
-    if (evbuffer_read(evbuf, sock_fd, -1) <= 0) {
-      evbuffer_free(evbuf);
+    if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
       LOG(ERROR) << "[migrate] Failed to read response, Err: " + std::string(strerror(errno));
       return false;
     }
@@ -481,7 +479,7 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
       switch (stat_) {
         // Handle single string response
         case ArrayLen: {
-          line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+          UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
           if (!line) {
             LOG(INFO) << "[migrate] Event buffer is empty, read socket again";
             run = false;
@@ -489,12 +487,12 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
           }
 
           if (line[0] == '-') {
-            LOG(ERROR) << "[migrate] Got invalid response: " + std::string(line)
-                << ", line length: " << line_len;
+            LOG(ERROR) << "[migrate] Got invalid response: " + std::string(line.get())
+                << ", line length: " << line.length;
             stat_ = Error;
           } else if (line[0] == '$') {
             try {
-              bulk_len = std::stoull(std::string(line + 1, line_len - 1));
+              bulk_len = std::stoull(std::string(line.get() + 1, line.length - 1));
               stat_ = bulk_len > 0 ? BulkData : OneRspEnd;
             } catch (const std::exception &e) {
               LOG(ERROR) << "[migrate] Protocol Err: expect integer";
@@ -503,22 +501,21 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
           } else if (line[0] == '+' || line[0] == ':') {
               stat_ = OneRspEnd;
           } else {
-            LOG(ERROR) << "[migrate] Unexpected response: " << line;
+            LOG(ERROR) << "[migrate] Unexpected response: " << line.get();
             stat_ = Error;
           }
 
-          free(line);
           break;
         }
         // Handle bulk string response
         case BulkData: {
-          if (evbuffer_get_length(evbuf) < bulk_len + 2) {
+          if (evbuffer_get_length(evbuf.get()) < bulk_len + 2) {
             LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, read socket again";
             run = false;
             break;
           }
           // TODO(chrisZMF): Check tail '\r\n'
-          evbuffer_drain(evbuf, bulk_len + 2);
+          evbuffer_drain(evbuf.get(), bulk_len + 2);
           bulk_len = 0;
           stat_ = OneRspEnd;
           break;
@@ -526,14 +523,12 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
         case OneRspEnd: {
           cnt++;
           if (cnt >= total) {
-            evbuffer_free(evbuf);
             return true;
           }
           stat_ = ArrayLen;
           break;
         }
         case Error: {
-          evbuffer_free(evbuf);
           return false;
         }
         default: break;
diff --git a/src/storage.cc b/src/storage.cc
index 4e474e7..2be8f42 100644
--- a/src/storage.cc
+++ b/src/storage.cc
@@ -44,6 +44,7 @@
 #include "event_listener.h"
 #include "compact_filter.h"
 #include "table_properties_collector.h"
+#include "event_util.h"
 
 namespace Engine {
 
@@ -848,8 +849,6 @@ int Storage::ReplDataManager::OpenDataFile(Storage *storage,
 
 Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(
     Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf) {
-  char *line;
-  size_t len;
   Storage::ReplDataManager::MetaInfo meta;
   auto meta_file = "meta/" + std::to_string(meta_id);
   DLOG(INFO) << "[meta] id: " << meta_id;
@@ -862,39 +861,34 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(
   wf->Close();
 
   // timestamp;
-  line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
-  DLOG(INFO) << "[meta] timestamp: " << line;
-  meta.timestamp = std::strtoll(line, nullptr, 10);
-  free(line);
+  UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_LF);
+  DLOG(INFO) << "[meta] timestamp: " << line.get();
+  meta.timestamp = std::strtoll(line.get(), nullptr, 10);
   // sequence
-  line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
-  DLOG(INFO) << "[meta] seq:" << line;
-  meta.seq = std::strtoull(line, nullptr, 10);
-  free(line);
+  line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
+  DLOG(INFO) << "[meta] seq:" << line.get();
+  meta.seq = std::strtoull(line.get(), nullptr, 10);
   // optional metadata
-  line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
-  if (strncmp(line, "metadata", 8) == 0) {
-    DLOG(INFO) << "[meta] meta: " << line;
-    meta.meta_data = std::string(line, len);
-    free(line);
-    line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
-  }
-  DLOG(INFO) << "[meta] file count: " << line;
-  free(line);
+  line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
+  if (strncmp(line.get(), "metadata", 8) == 0) {
+    DLOG(INFO) << "[meta] meta: " << line.get();
+    meta.meta_data = std::string(line.get(), line.length);
+    line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
+  }
+  DLOG(INFO) << "[meta] file count: " << line.get();
   // file list
   while (true) {
-    line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
+    line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
     if (!line) {
       break;
     }
-    DLOG(INFO) << "[meta] file info: " << line;
-    auto cptr = line;
+    DLOG(INFO) << "[meta] file info: " << line.get();
+    auto cptr = line.get();
     while (*(cptr++) != ' ') {}
-    auto filename = std::string(line, cptr - line - 1);
+    auto filename = std::string(line.get(), cptr - line.get() - 1);
     while (*(cptr++) != ' ') {}
     auto crc32 = std::strtoul(cptr, nullptr, 10);
     meta.files.emplace_back(filename, crc32);
-    free(line);
   }
   SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
   return meta;
diff --git a/src/util.cc b/src/util.cc
index e1a360b..21d633f 100644
--- a/src/util.cc
+++ b/src/util.cc
@@ -43,6 +43,7 @@
 
 #include "util.h"
 #include "status.h"
+#include "event_util.h"
 
 #ifndef POLLIN
 # define POLLIN      0x0001    /* There is data to read */
@@ -287,21 +288,15 @@ Status SockSetBlocking(int fd, int blocking) {
 }
 
 Status SockReadLine(int fd, std::string *data) {
-  size_t line_len;
-  evbuffer *evbuf = evbuffer_new();
-  if (evbuffer_read(evbuf, fd, -1) <= 0) {
-    evbuffer_free(evbuf);
+  UniqueEvbuf evbuf;
+  if (evbuffer_read(evbuf.get(), fd, -1) <= 0) {
     return Status(Status::NotOK, std::string("read response err: ") + strerror(errno));
   }
-  char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+  UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
   if (!line) {
-    free(line);
-    evbuffer_free(evbuf);
     return Status(Status::NotOK, std::string("read response err(empty): ") + strerror(errno));
   }
-  *data = std::string(line, line_len);
-  free(line);
-  evbuffer_free(evbuf);
+  *data = std::string(line.get(), line.length);
   return Status::OK();
 }