You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by ti...@apache.org on 2022/06/26 01:26:54 UTC
[incubator-kvrocks] branch unstable updated: Add some RAII type for evbuffer to avoid manual free (#645)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new dd25baf Add some RAII type for evbuffer to avoid manual free (#645)
dd25baf is described below
commit dd25baf3fbf145a17fcb63158f5166e78ee19e6d
Author: Twice <tw...@gmail.com>
AuthorDate: Sun Jun 26 09:26:50 2022 +0800
Add some RAII type for evbuffer to avoid manual free (#645)
---
.github/workflows/kvrocks.yaml | 2 +-
cppcheck.sh | 10 +++-
src/event_util.h | 59 ++++++++++++++++++
src/redis_request.cc | 39 +++++-------
src/replication.cc | 132 +++++++++++++++--------------------------
src/slot_migrate.cc | 27 ++++-----
src/storage.cc | 42 ++++++-------
src/util.cc | 15 ++---
8 files changed, 165 insertions(+), 161 deletions(-)
diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml
index 2ce5eae..6e367d8 100644
--- a/.github/workflows/kvrocks.yaml
+++ b/.github/workflows/kvrocks.yaml
@@ -39,7 +39,7 @@ jobs:
lint:
name: Lint and check code
- runs-on: ubuntu-18.04
+ runs-on: ubuntu-latest
steps:
- name: Checkout Code Base
uses: actions/checkout@v3
diff --git a/cppcheck.sh b/cppcheck.sh
index 439df36..752ec78 100755
--- a/cppcheck.sh
+++ b/cppcheck.sh
@@ -17,8 +17,14 @@
# specific language governing permissions and limitations
# under the License.
-CHECK_TYPES="warning,portability,information,missingInclude"
+# we should run cmake configuration to fetch deps if we want to enable missingInclude
+CHECK_TYPES="warning,portability,information"
STANDARD=c++11
ERROR_EXITCODE=1
LANG=c++
-cppcheck --force --enable=${CHECK_TYPES} -U__GNUC__ -x ${LANG} src --std=${STANDARD} --error-exitcode=${ERROR_EXITCODE}
+
+set -ex
+cppcheck --version
+cppcheck \
+ --force --enable=${CHECK_TYPES} -U__GNUC__ -x ${LANG} src --std=${STANDARD} --error-exitcode=${ERROR_EXITCODE} \
+ --inline-suppr --suppress=noCopyConstructor:src/server.cc --suppress=noOperatorEq:src/server.cc -j$(nproc)
diff --git a/src/event_util.h b/src/event_util.h
new file mode 100644
index 0000000..81c06b5
--- /dev/null
+++ b/src/event_util.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <cstdlib>
+#include <utility>
+
+#include "event2/buffer.h"
+
+template <typename F, F *f> struct StaticFunction {
+ template <typename... Ts> auto operator()(Ts &&...args) const -> decltype(f(std::forward<Ts>(args)...)) {
+ return f(std::forward<Ts>(args)...);
+ }
+};
+
+using StaticFree = StaticFunction<decltype(std::free), std::free>;
+
+template <typename T>
+struct UniqueFreePtr : std::unique_ptr<T, StaticFree> {
+ using base_type = std::unique_ptr<T, StaticFree>;
+
+ using base_type::base_type;
+};
+
+struct UniqueEvbufReadln : UniqueFreePtr<char[]> {
+ // cppcheck-suppress uninitMemberVar
+ UniqueEvbufReadln(evbuffer* buffer, evbuffer_eol_style eol_style)
+ : UniqueFreePtr(evbuffer_readln(buffer, &length, eol_style)) {}
+
+ size_t length;
+};
+
+using StaticEvbufFree = StaticFunction<decltype(evbuffer_free), evbuffer_free>;
+
+struct UniqueEvbuf : std::unique_ptr<evbuffer, StaticEvbufFree> {
+ using base_type = std::unique_ptr<evbuffer, StaticEvbufFree>;
+
+ UniqueEvbuf() : base_type(evbuffer_new()) {}
+ explicit UniqueEvbuf(evbuffer *buffer) : base_type(buffer) {}
+};
diff --git a/src/redis_request.cc b/src/redis_request.cc
index 1fdd5e9..158f531 100644
--- a/src/redis_request.cc
+++ b/src/redis_request.cc
@@ -31,6 +31,7 @@
#include "redis_connection.h"
#include "server.h"
#include "redis_slot.h"
+#include "event_util.h"
namespace Redis {
const size_t PROTO_INLINE_MAX_SIZE = 16 * 1024L;
@@ -38,74 +39,64 @@ const size_t PROTO_BULK_MAX_SIZE = 512 * 1024L * 1024L;
const size_t PROTO_MULTI_MAX_SIZE = 1024 * 1024L;
Status Request::Tokenize(evbuffer *input) {
- char *line;
- size_t len;
size_t pipeline_size = 0;
while (true) {
switch (state_) {
- case ArrayLen:
- line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF_STRICT);
- if (!line || len <= 0) {
+ case ArrayLen: {
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
+ if (!line || line.length <= 0) {
if (pipeline_size > 128) {
LOG(INFO) << "Large pipeline detected: " << pipeline_size;
}
if (line) {
- free(line);
continue;
}
return Status::OK();
}
pipeline_size++;
- svr_->stats_.IncrInbondBytes(len);
+ svr_->stats_.IncrInbondBytes(line.length);
if (line[0] == '*') {
try {
- multi_bulk_len_ = std::stoll(std::string(line + 1, len-1));
+ multi_bulk_len_ = std::stoll(std::string(line.get() + 1, line.length - 1));
} catch (std::exception &e) {
- free(line);
return Status(Status::NotOK, "Protocol error: invalid multibulk length");
}
if (multi_bulk_len_ <= 0) {
multi_bulk_len_ = 0;
- free(line);
continue;
}
if (multi_bulk_len_ > (int64_t)PROTO_MULTI_MAX_SIZE) {
- free(line);
return Status(Status::NotOK, "Protocol error: invalid multibulk length");
}
state_ = BulkLen;
} else {
- if (len > PROTO_INLINE_MAX_SIZE) {
- free(line);
+ if (line.length > PROTO_INLINE_MAX_SIZE) {
return Status(Status::NotOK, "Protocol error: invalid bulk length");
}
- tokens_ = Util::Split(std::string(line, len), " \t");
+ tokens_ = Util::Split(std::string(line.get(), line.length), " \t");
commands_.emplace_back(std::move(tokens_));
state_ = ArrayLen;
}
- free(line);
break;
- case BulkLen:
- line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF_STRICT);
- if (!line || len <= 0) return Status::OK();
- svr_->stats_.IncrInbondBytes(len);
+ }
+ case BulkLen: {
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
+ if (!line || line.length <= 0) return Status::OK();
+ svr_->stats_.IncrInbondBytes(line.length);
if (line[0] != '$') {
- free(line);
return Status(Status::NotOK, "Protocol error: expected '$'");
}
try {
- bulk_len_ = std::stoull(std::string(line + 1, len-1));
+ bulk_len_ = std::stoull(std::string(line.get() + 1, line.length - 1));
} catch (std::exception &e) {
- free(line);
return Status(Status::NotOK, "Protocol error: invalid bulk length");
}
if (bulk_len_ > PROTO_BULK_MAX_SIZE) {
- free(line);
return Status(Status::NotOK, "Protocol error: invalid bulk length");
}
- free(line);
state_ = BulkData;
break;
+ }
case BulkData:
if (evbuffer_get_length(input) < bulk_len_ + 2) return Status::OK();
char *data = reinterpret_cast<char *>(evbuffer_pullup(input, bulk_len_ + 2));
diff --git a/src/replication.cc b/src/replication.cc
index d08f77c..76bd4f7 100644
--- a/src/replication.cc
+++ b/src/replication.cc
@@ -37,6 +37,7 @@
#include "util.h"
#include "status.h"
#include "server.h"
+#include "event_util.h"
FeedSlaveThread::~FeedSlaveThread() {
delete conn_;
@@ -395,18 +396,14 @@ ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev,
ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev,
void *ctx) {
- char *line;
- size_t line_len;
auto input = bufferevent_get_input(bev);
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
- if (strncmp(line, "+OK", 3) != 0) {
+ if (strncmp(line.get(), "+OK", 3) != 0) {
// Auth failed
- LOG(ERROR) << "[replication] Auth failed: " << line;
- free(line);
+ LOG(ERROR) << "[replication] Auth failed: " << line.get();
return CBState::RESTART;
}
- free(line);
LOG(INFO) << "[replication] Auth response was received, continue...";
return CBState::NEXT;
}
@@ -422,31 +419,26 @@ ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(
ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(
bufferevent *bev, void *ctx) {
- char *line;
- size_t line_len;
auto input = bufferevent_get_input(bev);
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- if (isRestoringError(line)) {
+ if (isRestoringError(line.get())) {
LOG(WARNING) << "The master was restoring the db, retry later";
} else {
- LOG(ERROR) << "Failed to get the db name, " << line;
+ LOG(ERROR) << "Failed to get the db name, " << line.get();
}
- free(line);
return CBState::RESTART;
}
auto self = static_cast<ReplicationThread *>(ctx);
std::string db_name = self->storage_->GetName();
- if (line_len == db_name.size() && !strncmp(line, db_name.data(), line_len)) {
+ if (line.length == db_name.size() && !strncmp(line.get(), db_name.data(), line.length)) {
// DB name match, we should continue to next step: TryPsync
- free(line);
LOG(INFO) << "[replication] DB name is valid, continue...";
return CBState::NEXT;
}
- LOG(ERROR) << "[replication] Mismatched the db name, local: " << db_name << ", remote: " << line;
- free(line);
+ LOG(ERROR) << "[replication] Mismatched the db name, local: " << db_name << ", remote: " << line.get();
return CBState::RESTART;
}
@@ -462,24 +454,19 @@ ReplicationThread::CBState ReplicationThread::replConfWriteCB(
ReplicationThread::CBState ReplicationThread::replConfReadCB(
bufferevent *bev, void *ctx) {
- char *line;
- size_t line_len;
auto input = bufferevent_get_input(bev);
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
- if (line[0] == '-' && isRestoringError(line)) {
- free(line);
+ if (line[0] == '-' && isRestoringError(line.get())) {
LOG(WARNING) << "The master was restoring the db, retry later";
return CBState::RESTART;
}
- if (strncmp(line, "+OK", 3) != 0) {
- LOG(WARNING) << "[replication] Failed to replconf: " << line+1;
- free(line);
+ if (strncmp(line.get(), "+OK", 3) != 0) {
+ LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
// backward compatible with old version that doesn't support replconf cmd
return CBState::NEXT;
} else {
- free(line);
LOG(INFO) << "[replication] replconf is ok, start psync";
return CBState::NEXT;
}
@@ -526,39 +513,33 @@ ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(
ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev,
void *ctx) {
- char *line;
- size_t line_len;
auto self = static_cast<ReplicationThread *>(ctx);
auto input = bufferevent_get_input(bev);
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
- if (line[0] == '-' && isRestoringError(line)) {
- free(line);
+ if (line[0] == '-' && isRestoringError(line.get())) {
LOG(WARNING) << "The master was restoring the db, retry later";
return CBState::RESTART;
}
- if (line[0] == '-' && isWrongPsyncNum(line)) {
+ if (line[0] == '-' && isWrongPsyncNum(line.get())) {
self->next_try_old_psync_ = true;
- free(line);
LOG(WARNING) << "The old version master, can't handle new PSYNC, "
<< "try old PSYNC again";
// Retry previous state, i.e. send PSYNC again
return CBState::PREV;
}
- if (strncmp(line, "+OK", 3) != 0) {
+ if (strncmp(line.get(), "+OK", 3) != 0) {
// PSYNC isn't OK, we should use FullSync
// Switch to fullsync state machine
self->fullsync_steps_.Start();
- LOG(INFO) << "[replication] Failed to psync, error: " << line
+ LOG(INFO) << "[replication] Failed to psync, error: " << line.get()
<< ", switch to fullsync";
- free(line);
return CBState::QUIT;
} else {
// PSYNC is OK, use IncrementBatchLoop
- free(line);
LOG(INFO) << "[replication] PSync is ok, start increment batch loop";
return CBState::NEXT;
}
@@ -566,26 +547,24 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev,
ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(
bufferevent *bev, void *ctx) {
- char *line = nullptr;
- size_t line_len = 0;
char *bulk_data = nullptr;
auto self = static_cast<ReplicationThread *>(ctx);
self->repl_state_ = kReplConnected;
auto input = bufferevent_get_input(bev);
while (true) {
switch (self->incr_state_) {
- case Incr_batch_size:
+ case Incr_batch_size: {
// Read bulk length
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
- self->incr_bulk_len_ = line_len > 0 ? std::strtoull(line + 1, nullptr, 10) : 0;
- free(line);
+ self->incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, nullptr, 10) : 0;
if (self->incr_bulk_len_ == 0) {
LOG(ERROR) << "[replication] Invalid increment data size";
return CBState::RESTART;
}
self->incr_state_ = Incr_batch_data;
break;
+ }
case Incr_batch_data:
// Read bulk data (batch data)
if (self->incr_bulk_len_+2 <= evbuffer_get_length(input)) { // We got enough data
@@ -623,50 +602,46 @@ ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(
ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
void *ctx) {
- char *line;
- size_t line_len;
auto self = static_cast<ReplicationThread *>(ctx);
auto input = bufferevent_get_input(bev);
switch (self->fullsync_state_) {
- case kFetchMetaID:
+ case kFetchMetaID: {
// New version master only sends meta file content
if (!self->srv_->GetConfig()->master_use_repl_port) {
self->fullsync_state_ = kFetchMetaContent;
return CBState::AGAIN;
}
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- LOG(ERROR) << "[replication] Failed to fetch meta id: " << line;
- free(line);
+ LOG(ERROR) << "[replication] Failed to fetch meta id: " << line.get();
return CBState::RESTART;
}
self->fullsync_meta_id_ = static_cast<rocksdb::BackupID>(
- line_len > 0 ? std::strtoul(line, nullptr, 10) : 0);
- free(line);
+ line.length > 0 ? std::strtoul(line.get(), nullptr, 10) : 0);
if (self->fullsync_meta_id_ == 0) {
LOG(ERROR) << "[replication] Invalid meta id received";
return CBState::RESTART;
}
self->fullsync_state_ = kFetchMetaSize;
LOG(INFO) << "[replication] Succeed fetching meta id: " << self->fullsync_meta_id_;
- case kFetchMetaSize:
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ }
+ case kFetchMetaSize: {
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- LOG(ERROR) << "[replication] Failed to fetch meta size: " << line;
- free(line);
+ LOG(ERROR) << "[replication] Failed to fetch meta size: " << line.get();
return CBState::RESTART;
}
- self->fullsync_filesize_ = line_len > 0 ? std::strtoull(line, nullptr, 10) : 0;
- free(line);
+ self->fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
if (self->fullsync_filesize_ == 0) {
LOG(ERROR) << "[replication] Invalid meta file size received";
return CBState::RESTART;
}
self->fullsync_state_ = kFetchMetaContent;
LOG(INFO) << "[replication] Succeed fetching meta size: " << self->fullsync_filesize_;
- case kFetchMetaContent:
+ }
+ case kFetchMetaContent: {
std::string target_dir;
Engine::Storage::ReplDataManager::MetaInfo meta;
// Master using old version
@@ -679,18 +654,16 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
target_dir = self->srv_->GetConfig()->backup_sync_dir;
} else {
// Master using new version
- line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- LOG(ERROR) << "[replication] Failed to fetch meta info: " << line;
- free(line);
+ LOG(ERROR) << "[replication] Failed to fetch meta info: " << line.get();
return CBState::RESTART;
}
- std::vector<std::string> need_files = Util::Split(std::string(line), ",");
+ std::vector<std::string> need_files = Util::Split(std::string(line.get()), ",");
for (auto f : need_files) {
meta.files.emplace_back(f, 0);
}
- free(line);
target_dir = self->srv_->GetConfig()->sync_checkpoint_dir;
// Clean invalid files of checkpoint, "CURRENT" file must be invalid
@@ -750,6 +723,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
// Switch to psync state machine again
self->psync_steps_.Start();
return CBState::QUIT;
+ }
}
LOG(ERROR) << "Should not arrive here";
@@ -840,31 +814,24 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
}
Status ReplicationThread::sendAuth(int sock_fd) {
- size_t line_len;
-
// Send auth when needed
std::string auth = srv_->GetConfig()->masterauth;
if (!auth.empty()) {
- evbuffer *evbuf = evbuffer_new();
+ UniqueEvbuf evbuf;
const auto auth_command = Redis::MultiBulkString({"AUTH", auth});
auto s = Util::SockSend(sock_fd, auth_command);
if (!s.IsOK()) return Status(Status::NotOK, "send auth command err:"+s.Msg());
while (true) {
- if (evbuffer_read(evbuf, sock_fd, -1) <= 0) {
- evbuffer_free(evbuf);
+ if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
return Status(Status::NotOK, std::string("read auth response err: ")+strerror(errno));
}
- char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) continue;
- if (strncmp(line, "+OK", 3) != 0) {
- free(line);
- evbuffer_free(evbuf);
+ if (strncmp(line.get(), "+OK", 3) != 0) {
return Status(Status::NotOK, "auth got invalid response");
}
- free(line);
break;
}
- evbuffer_free(evbuf);
}
return Status::OK();
}
@@ -872,24 +839,22 @@ Status ReplicationThread::sendAuth(int sock_fd) {
Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf,
const std::string &dir, std::string file,
uint32_t crc, fetch_file_callback fn) {
- size_t line_len, file_size;
+ size_t file_size;
// Read file size line
while (true) {
- char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
if (evbuffer_read(evbuf, sock_fd, -1) <= 0) {
return Status(Status::NotOK, std::string("read size: ")+strerror(errno));
}
continue;
}
- if (*line == '-') {
- std::string msg(line);
- free(line);
+ if (line[0] == '-') {
+ std::string msg(line.get());
return Status(Status::NotOK, msg);
}
- file_size = line_len > 0 ? std::strtoull(line, nullptr, 10) : 0;
- free(line);
+ file_size = line.length > 0 ? std::strtoull(line.get(), nullptr, 10) : 0;
break;
}
@@ -947,10 +912,10 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir,
auto s = Util::SockSend(sock_fd, fetch_command);
if (!s.IsOK()) return Status(Status::NotOK, "send fetch file command: "+s.Msg());
- evbuffer *evbuf = evbuffer_new();
+ UniqueEvbuf evbuf;
for (unsigned i = 0; i < files.size(); i++) {
DLOG(INFO) << "[fetch] Start to fetch file " << files[i];
- s = fetchFile(sock_fd, evbuf, dir, files[i], crcs[i], fn);
+ s = fetchFile(sock_fd, evbuf.get(), dir, files[i], crcs[i], fn);
if (!s.IsOK()) {
s = Status(Status::NotOK, "fetch file err: " + s.Msg());
LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: " << s.Msg();
@@ -963,7 +928,6 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir,
sleep(srv_->GetConfig()->fullsync_recv_file_delay);
}
}
- evbuffer_free(evbuf);
return s;
}
diff --git a/src/slot_migrate.cc b/src/slot_migrate.cc
index 9b22eda..9273493 100644
--- a/src/slot_migrate.cc
+++ b/src/slot_migrate.cc
@@ -19,8 +19,8 @@
*/
#include "slot_migrate.h"
-
#include "batch_extractor.h"
+#include "event_util.h"
static std::map<RedisType, std::string> type_to_cmd = {
{kRedisString, "set"},
@@ -462,15 +462,13 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
// Start checking response
- size_t line_len = 0, bulk_len = 0;
- char *line = nullptr;
+ size_t bulk_len = 0;
int cnt = 0;
stat_ = ArrayLen;
- evbuffer *evbuf = evbuffer_new();
+ UniqueEvbuf evbuf;
while (true) {
// Read response data from socket buffer to event buffer
- if (evbuffer_read(evbuf, sock_fd, -1) <= 0) {
- evbuffer_free(evbuf);
+ if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
LOG(ERROR) << "[migrate] Failed to read response, Err: " + std::string(strerror(errno));
return false;
}
@@ -481,7 +479,7 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
switch (stat_) {
// Handle single string response
case ArrayLen: {
- line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
LOG(INFO) << "[migrate] Event buffer is empty, read socket again";
run = false;
@@ -489,12 +487,12 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
}
if (line[0] == '-') {
- LOG(ERROR) << "[migrate] Got invalid response: " + std::string(line)
- << ", line length: " << line_len;
+ LOG(ERROR) << "[migrate] Got invalid response: " + std::string(line.get())
+ << ", line length: " << line.length;
stat_ = Error;
} else if (line[0] == '$') {
try {
- bulk_len = std::stoull(std::string(line + 1, line_len - 1));
+ bulk_len = std::stoull(std::string(line.get() + 1, line.length - 1));
stat_ = bulk_len > 0 ? BulkData : OneRspEnd;
} catch (const std::exception &e) {
LOG(ERROR) << "[migrate] Protocol Err: expect integer";
@@ -503,22 +501,21 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
} else if (line[0] == '+' || line[0] == ':') {
stat_ = OneRspEnd;
} else {
- LOG(ERROR) << "[migrate] Unexpected response: " << line;
+ LOG(ERROR) << "[migrate] Unexpected response: " << line.get();
stat_ = Error;
}
- free(line);
break;
}
// Handle bulk string response
case BulkData: {
- if (evbuffer_get_length(evbuf) < bulk_len + 2) {
+ if (evbuffer_get_length(evbuf.get()) < bulk_len + 2) {
LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, read socket again";
run = false;
break;
}
// TODO(chrisZMF): Check tail '\r\n'
- evbuffer_drain(evbuf, bulk_len + 2);
+ evbuffer_drain(evbuf.get(), bulk_len + 2);
bulk_len = 0;
stat_ = OneRspEnd;
break;
@@ -526,14 +523,12 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
case OneRspEnd: {
cnt++;
if (cnt >= total) {
- evbuffer_free(evbuf);
return true;
}
stat_ = ArrayLen;
break;
}
case Error: {
- evbuffer_free(evbuf);
return false;
}
default: break;
diff --git a/src/storage.cc b/src/storage.cc
index 4e474e7..2be8f42 100644
--- a/src/storage.cc
+++ b/src/storage.cc
@@ -44,6 +44,7 @@
#include "event_listener.h"
#include "compact_filter.h"
#include "table_properties_collector.h"
+#include "event_util.h"
namespace Engine {
@@ -848,8 +849,6 @@ int Storage::ReplDataManager::OpenDataFile(Storage *storage,
Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(
Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf) {
- char *line;
- size_t len;
Storage::ReplDataManager::MetaInfo meta;
auto meta_file = "meta/" + std::to_string(meta_id);
DLOG(INFO) << "[meta] id: " << meta_id;
@@ -862,39 +861,34 @@ Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave(
wf->Close();
// timestamp;
- line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
- DLOG(INFO) << "[meta] timestamp: " << line;
- meta.timestamp = std::strtoll(line, nullptr, 10);
- free(line);
+ UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_LF);
+ DLOG(INFO) << "[meta] timestamp: " << line.get();
+ meta.timestamp = std::strtoll(line.get(), nullptr, 10);
// sequence
- line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
- DLOG(INFO) << "[meta] seq:" << line;
- meta.seq = std::strtoull(line, nullptr, 10);
- free(line);
+ line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
+ DLOG(INFO) << "[meta] seq:" << line.get();
+ meta.seq = std::strtoull(line.get(), nullptr, 10);
// optional metadata
- line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
- if (strncmp(line, "metadata", 8) == 0) {
- DLOG(INFO) << "[meta] meta: " << line;
- meta.meta_data = std::string(line, len);
- free(line);
- line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
- }
- DLOG(INFO) << "[meta] file count: " << line;
- free(line);
+ line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
+ if (strncmp(line.get(), "metadata", 8) == 0) {
+ DLOG(INFO) << "[meta] meta: " << line.get();
+ meta.meta_data = std::string(line.get(), line.length);
+ line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
+ }
+ DLOG(INFO) << "[meta] file count: " << line.get();
// file list
while (true) {
- line = evbuffer_readln(evbuf, &len, EVBUFFER_EOL_LF);
+ line = UniqueEvbufReadln(evbuf, EVBUFFER_EOL_LF);
if (!line) {
break;
}
- DLOG(INFO) << "[meta] file info: " << line;
- auto cptr = line;
+ DLOG(INFO) << "[meta] file info: " << line.get();
+ auto cptr = line.get();
while (*(cptr++) != ' ') {}
- auto filename = std::string(line, cptr - line - 1);
+ auto filename = std::string(line.get(), cptr - line.get() - 1);
while (*(cptr++) != ' ') {}
auto crc32 = std::strtoul(cptr, nullptr, 10);
meta.files.emplace_back(filename, crc32);
- free(line);
}
SwapTmpFile(storage, storage->config_->backup_sync_dir, meta_file);
return meta;
diff --git a/src/util.cc b/src/util.cc
index e1a360b..21d633f 100644
--- a/src/util.cc
+++ b/src/util.cc
@@ -43,6 +43,7 @@
#include "util.h"
#include "status.h"
+#include "event_util.h"
#ifndef POLLIN
# define POLLIN 0x0001 /* There is data to read */
@@ -287,21 +288,15 @@ Status SockSetBlocking(int fd, int blocking) {
}
Status SockReadLine(int fd, std::string *data) {
- size_t line_len;
- evbuffer *evbuf = evbuffer_new();
- if (evbuffer_read(evbuf, fd, -1) <= 0) {
- evbuffer_free(evbuf);
+ UniqueEvbuf evbuf;
+ if (evbuffer_read(evbuf.get(), fd, -1) <= 0) {
return Status(Status::NotOK, std::string("read response err: ") + strerror(errno));
}
- char *line = evbuffer_readln(evbuf, &line_len, EVBUFFER_EOL_CRLF_STRICT);
+ UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
- free(line);
- evbuffer_free(evbuf);
return Status(Status::NotOK, std::string("read response err(empty): ") + strerror(errno));
}
- *data = std::string(line, line_len);
- free(line);
- evbuffer_free(evbuf);
+ *data = std::string(line.get(), line.length);
return Status::OK();
}