You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/03/27 08:13:00 UTC
[incubator-pegasus] branch master updated: refactor: return Status::code instead of meanless integer (#1417)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 58bad7471 refactor: return Status::code instead of meanless integer (#1417)
58bad7471 is described below
commit 58bad7471dc2f9f3ed716ad475750bd0ea0f53f9
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Mon Mar 27 16:12:53 2023 +0800
refactor: return Status::code instead of meanless integer (#1417)
https://github.com/apache/incubator-pegasus/issues/1383
To handle the return code of read and write requests, it would be great
to refactor the return code of the related functions.
This patch change to use rocksdb::Status::code insteadn of meanless integer,
and left some TODOs to be dealt in follow up patchs.
---
src/common/storage_serverlet.h | 2 ++
src/replica/replica.cpp | 1 +
src/replica/replication_app_base.cpp | 14 +++++++-------
src/replica/replication_app_base.h | 5 +++--
src/server/pegasus_read_service.h | 8 ++------
src/server/pegasus_server_write.cpp | 14 ++++++++------
src/server/pegasus_server_write.h | 2 +-
src/server/pegasus_write_service.cpp | 4 ++--
src/server/pegasus_write_service.h | 8 ++++----
src/server/pegasus_write_service_impl.h | 22 +++++++++++-----------
src/server/rocksdb_wrapper.cpp | 2 +-
src/server/rocksdb_wrapper.h | 7 ++++---
12 files changed, 46 insertions(+), 43 deletions(-)
diff --git a/src/common/storage_serverlet.h b/src/common/storage_serverlet.h
index 8eb7c67e0..30fa56287 100644
--- a/src/common/storage_serverlet.h
+++ b/src/common/storage_serverlet.h
@@ -122,11 +122,13 @@ protected:
return nullptr;
}
+ // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
int handle_request(dsn::message_ex *request)
{
dsn::task_code t = request->rpc_code();
const rpc_handler *ptr = find_handler(t);
if (ptr != nullptr) {
+ // TODO(yingchun): add return value
(*ptr)(static_cast<T *>(this), request);
} else {
LOG_WARNING("recv message with unhandled rpc name {} from {}, trace_id = {:#018x} ",
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index ef363b2fe..be2cbd309 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -299,6 +299,7 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
uint64_t start_time_ns = dsn_now_ns();
CHECK(_app, "");
+ // TODO(yingchun): check the return value.
_app->on_request(request);
// If the corresponding perf counter exist, count the duration of this operation.
diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp
index 596a4e674..5ae07b90b 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -26,6 +26,7 @@
#include <alloca.h>
#include <fcntl.h>
+#include <rocksdb/status.h>
#include <algorithm>
#include <fstream>
#include <memory>
@@ -373,12 +374,11 @@ int replication_app_base::on_batched_write_requests(int64_t decree,
message_ex **requests,
int request_length)
{
- int storage_error = 0;
+ int storage_error = rocksdb::Status::kOk;
for (int i = 0; i < request_length; ++i) {
- // TODO(yingchun): better to return error_code
int e = on_request(requests[i]);
- if (e != 0) {
- LOG_ERROR_PREFIX("got storage error when handler request({})",
+ if (e != rocksdb::Status::kOk) {
+ LOG_ERROR_PREFIX("got storage engine error when handler request({})",
requests[i]->header->rpc_name);
storage_error = e;
}
@@ -425,7 +425,7 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
}
}
- int perror = on_batched_write_requests(
+ int storage_error = on_batched_write_requests(
mu->data.header.decree, mu->data.header.timestamp, batched_requests, batched_count);
// release faked requests
@@ -433,8 +433,8 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
faked_requests[i]->release_ref();
}
- if (perror != 0) {
- LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror);
+ if (storage_error != rocksdb::Status::kOk) {
+ LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), storage_error);
// For normal write requests, if got rocksdb error, this replica will be set error and evoke
// learn.
// For ingestion requests, should not do as normal write requests, there are two reasons:
diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h
index 9e9257e95..e0e292d6d 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -215,7 +215,7 @@ public:
//
virtual replication::decree last_durable_decree() const = 0;
virtual replication::decree last_flushed_decree() const { return last_durable_decree(); }
- // return the error generated by storage engine
+ // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) = 0;
//
@@ -225,8 +225,9 @@ public:
// The base class gives a naive implementation that just call on_request
// repeatedly. Storage engine may override this function to get better performance.
//
+ // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_batched_write_requests(int64_t decree,
- uint64_t timstamp,
+ uint64_t timestamp,
message_ex **requests,
int request_length);
diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h
index 697274d83..163a529d3 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -42,12 +42,8 @@ public:
pegasus_read_service(dsn::replication::replica *r) : dsn::replication::replication_app_base(r)
{
}
- virtual ~pegasus_read_service() {}
- virtual int on_request(dsn::message_ex *request) override
- {
- handle_request(request);
- return 0;
- }
+
+ int on_request(dsn::message_ex *request) override { return handle_request(request); }
protected:
// all service handlers to be implemented further
diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp
index 8daacc48b..0147050a6 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <rocksdb/status.h>
#include <stdio.h>
#include <thrift/transport/TTransportException.h>
#include <algorithm>
@@ -84,7 +85,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}",
requests[0]->header->from_address.to_string(),
ex.what());
- return 0;
+ return rocksdb::Status::kOk;
}
return on_batched_writes(requests, count);
@@ -94,7 +95,7 @@ void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_defau
int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count)
{
- int err = 0;
+ int err = rocksdb::Status::kOk;
{
_write_svc->batch_prepare(_decree);
@@ -104,7 +105,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.
- int local_err = 0;
+ int local_err = rocksdb::Status::kOk;
try {
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
@@ -130,13 +131,14 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
ex.what());
}
- if (!err && local_err) {
+ if (err == rocksdb::Status::kOk && local_err != rocksdb::Status::kOk) {
err = local_err;
}
}
- if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) {
- _write_svc->batch_abort(_decree, err == 0 ? -1 : err);
+ if (dsn_unlikely(err != rocksdb::Status::kOk ||
+ (_put_rpc_batch.empty() && _remove_rpc_batch.empty()))) {
+ _write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 : err);
} else {
err = _write_svc->batch_commit(_decree);
}
diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h
index 7bf35b347..6a002c299 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -52,7 +52,7 @@ public:
/// Error returned is regarded as the failure of replica, thus will trigger
/// cluster membership changes. Make sure no error is returned because of
/// invalid user argument.
- /// As long as the returned error is 0, the operation is guaranteed to be
+ /// As long as the returned error is rocksdb::Status::kOk, the operation is guaranteed to be
/// successfully applied into rocksdb, which means an empty_put will be called
/// even if there's no write.
int on_batched_write_requests(dsn::message_ex **requests,
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index dd82e205d..73f6cc8d6 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -386,7 +386,7 @@ int pegasus_write_service::duplicate(int64_t decree,
remove_rpc remove;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
- int err = 0;
+ int err = rocksdb::Status::kOk;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
put = put_rpc(write);
err = _impl->batch_put(ctx, put.request(), put.response());
@@ -422,7 +422,7 @@ int pegasus_write_service::ingest_files(int64_t decree,
resp.err = dsn::ERR_OK;
// write empty put to flush decree
resp.rocksdb_error = empty_put(decree);
- if (resp.rocksdb_error != 0) {
+ if (resp.rocksdb_error != rocksdb::Status::kOk) {
resp.err = dsn::ERR_TRY_AGAIN;
return resp.rocksdb_error;
}
diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h
index 4136f02b6..9fb854ffd 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -176,20 +176,20 @@ public:
void batch_prepare(int64_t decree);
// Add PUT record in batch write.
- // \returns 0 if success, non-0 if failure.
+ // \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp);
// Add REMOVE record in batch write.
- // \returns 0 if success, non-0 if failure.
+ // \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp);
// Commit batch write.
- // \returns 0 if success, non-0 if failure.
- // NOTE that if the batch contains no updates, 0 is returned.
+ // \returns rocksdb::Status::Code.
+ // NOTE that if the batch contains no updates, rocksdb::Status::kOk is returned.
int batch_commit(int64_t decree);
// Abort batch write.
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index e0310d741..b75b379a3 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -36,6 +36,7 @@ namespace pegasus {
namespace server {
/// internal error codes used for fail injection
+// TODO(yingchun): Use real rocksdb::Status::code.
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
@@ -99,12 +100,11 @@ public:
int err =
_rocksdb_wrapper->write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0);
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
- if (err) {
+ if (err != rocksdb::Status::kOk) {
return err;
}
- err = _rocksdb_wrapper->write(decree);
- return err;
+ return _rocksdb_wrapper->write(decree);
}
int multi_put(const db_write_context &ctx,
@@ -170,7 +170,7 @@ public:
}
resp.error = _rocksdb_wrapper->write(decree);
- if (resp.error == 0) {
+ if (resp.error == rocksdb::Status::kOk) {
resp.count = update.sort_keys.size();
}
return resp.error;
@@ -188,7 +188,7 @@ public:
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
- if (err != 0) {
+ if (err != rocksdb::Status::kOk) {
resp.error = err;
return err;
}
@@ -252,7 +252,7 @@ public:
}
resp.error = _rocksdb_wrapper->write(decree);
- if (resp.error == 0) {
+ if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}
return resp.error;
@@ -283,7 +283,7 @@ public:
db_get_context get_context;
dsn::string_view check_raw_key(check_key.data(), check_key.length());
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
- if (err != 0) {
+ if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, "
"check_sort_key: {}",
@@ -352,7 +352,7 @@ public:
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}
- return 0;
+ return rocksdb::Status::kOk;
}
int check_and_mutate(int64_t decree,
@@ -404,7 +404,7 @@ public:
db_get_context get_context;
dsn::string_view check_raw_key(check_key.data(), check_key.length());
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
- if (err != 0) {
+ if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, "
"check_sort_key: {}",
@@ -475,7 +475,7 @@ public:
resp.error =
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}
- return 0;
+ return rocksdb::Status::kOk;
}
// \return ERR_INVALID_VERSION: replay or commit out-date ingest request
@@ -508,7 +508,7 @@ public:
// ingest external files
if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) !=
- 0)) {
+ rocksdb::Status::kOk)) {
return dsn::ERR_INGESTION_FAILED;
}
return dsn::ERR_OK;
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 574b00a84..c5872ec13 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -115,7 +115,7 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
db_get_context get_ctx;
int err = get(raw_key, &get_ctx);
- if (dsn_unlikely(err != 0)) {
+ if (dsn_unlikely(err != rocksdb::Status::kOk)) {
return err;
}
// if record exists and is not expired.
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index 05b73dc76..e3c713512 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -53,9 +53,10 @@ public:
rocksdb_wrapper(pegasus_server_impl *server);
/// Calls RocksDB Get and store the result into `db_get_context`.
- /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
- /// \result ctx.expired=true if record expired. Still 0 is returned.
- /// \result ctx.found=false if record is not found. Still 0 is returned.
+ /// \returns rocksdb::Status::kOk if Get succeeded. On failure, a non-zero rocksdb status code
+ /// is returned.
+ /// \result ctx.expired=true if record expired. Still rocksdb::Status::kOk is returned.
+ /// \result ctx.found=false if record is not found. Still rocksdb::Status::kOk is returned.
int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);
int write_batch_put(int64_t decree,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org