You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/03/27 08:13:00 UTC

[incubator-pegasus] branch master updated: refactor: return Status::code instead of meanless integer (#1417)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 58bad7471 refactor: return Status::code instead of meanless integer (#1417)
58bad7471 is described below

commit 58bad7471dc2f9f3ed716ad475750bd0ea0f53f9
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Mon Mar 27 16:12:53 2023 +0800

    refactor: return Status::code instead of meanless integer (#1417)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    To handle the return code of read and write requests, it would be great
    to refactor the return code of the related functions.
    This patch change to use rocksdb::Status::code insteadn of meanless integer,
    and left some TODOs to be dealt in follow up patchs.
---
 src/common/storage_serverlet.h          |  2 ++
 src/replica/replica.cpp                 |  1 +
 src/replica/replication_app_base.cpp    | 14 +++++++-------
 src/replica/replication_app_base.h      |  5 +++--
 src/server/pegasus_read_service.h       |  8 ++------
 src/server/pegasus_server_write.cpp     | 14 ++++++++------
 src/server/pegasus_server_write.h       |  2 +-
 src/server/pegasus_write_service.cpp    |  4 ++--
 src/server/pegasus_write_service.h      |  8 ++++----
 src/server/pegasus_write_service_impl.h | 22 +++++++++++-----------
 src/server/rocksdb_wrapper.cpp          |  2 +-
 src/server/rocksdb_wrapper.h            |  7 ++++---
 12 files changed, 46 insertions(+), 43 deletions(-)

diff --git a/src/common/storage_serverlet.h b/src/common/storage_serverlet.h
index 8eb7c67e0..30fa56287 100644
--- a/src/common/storage_serverlet.h
+++ b/src/common/storage_serverlet.h
@@ -122,11 +122,13 @@ protected:
         return nullptr;
     }
 
+    // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
     int handle_request(dsn::message_ex *request)
     {
         dsn::task_code t = request->rpc_code();
         const rpc_handler *ptr = find_handler(t);
         if (ptr != nullptr) {
+            // TODO(yingchun): add return value
             (*ptr)(static_cast<T *>(this), request);
         } else {
             LOG_WARNING("recv message with unhandled rpc name {} from {}, trace_id = {:#018x} ",
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index ef363b2fe..be2cbd309 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -299,6 +299,7 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
 
     uint64_t start_time_ns = dsn_now_ns();
     CHECK(_app, "");
+    // TODO(yingchun): check the return value.
     _app->on_request(request);
 
     // If the corresponding perf counter exist, count the duration of this operation.
diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp
index 596a4e674..5ae07b90b 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -26,6 +26,7 @@
 
 #include <alloca.h>
 #include <fcntl.h>
+#include <rocksdb/status.h>
 #include <algorithm>
 #include <fstream>
 #include <memory>
@@ -373,12 +374,11 @@ int replication_app_base::on_batched_write_requests(int64_t decree,
                                                     message_ex **requests,
                                                     int request_length)
 {
-    int storage_error = 0;
+    int storage_error = rocksdb::Status::kOk;
     for (int i = 0; i < request_length; ++i) {
-        // TODO(yingchun): better to return error_code
         int e = on_request(requests[i]);
-        if (e != 0) {
-            LOG_ERROR_PREFIX("got storage error when handler request({})",
+        if (e != rocksdb::Status::kOk) {
+            LOG_ERROR_PREFIX("got storage engine error when handler request({})",
                              requests[i]->header->rpc_name);
             storage_error = e;
         }
@@ -425,7 +425,7 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
         }
     }
 
-    int perror = on_batched_write_requests(
+    int storage_error = on_batched_write_requests(
         mu->data.header.decree, mu->data.header.timestamp, batched_requests, batched_count);
 
     // release faked requests
@@ -433,8 +433,8 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
         faked_requests[i]->release_ref();
     }
 
-    if (perror != 0) {
-        LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror);
+    if (storage_error != rocksdb::Status::kOk) {
+        LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), storage_error);
         // For normal write requests, if got rocksdb error, this replica will be set error and evoke
         // learn.
         // For ingestion requests, should not do as normal write requests, there are two reasons:
diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h
index 9e9257e95..e0e292d6d 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -215,7 +215,7 @@ public:
     //
     virtual replication::decree last_durable_decree() const = 0;
     virtual replication::decree last_flushed_decree() const { return last_durable_decree(); }
-    // return the error generated by storage engine
+    // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
     virtual int on_request(message_ex *request) = 0;
 
     //
@@ -225,8 +225,9 @@ public:
     // The base class gives a naive implementation that just call on_request
     // repeatedly. Storage engine may override this function to get better performance.
     //
+    // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
     virtual int on_batched_write_requests(int64_t decree,
-                                          uint64_t timstamp,
+                                          uint64_t timestamp,
                                           message_ex **requests,
                                           int request_length);
 
diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h
index 697274d83..163a529d3 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -42,12 +42,8 @@ public:
     pegasus_read_service(dsn::replication::replica *r) : dsn::replication::replication_app_base(r)
     {
     }
-    virtual ~pegasus_read_service() {}
-    virtual int on_request(dsn::message_ex *request) override
-    {
-        handle_request(request);
-        return 0;
-    }
+
+    int on_request(dsn::message_ex *request) override { return handle_request(request); }
 
 protected:
     // all service handlers to be implemented further
diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp
index 8daacc48b..0147050a6 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include <rocksdb/status.h>
 #include <stdio.h>
 #include <thrift/transport/TTransportException.h>
 #include <algorithm>
@@ -84,7 +85,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
         LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}",
                          requests[0]->header->from_address.to_string(),
                          ex.what());
-        return 0;
+        return rocksdb::Status::kOk;
     }
 
     return on_batched_writes(requests, count);
@@ -94,7 +95,7 @@ void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_defau
 
 int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count)
 {
-    int err = 0;
+    int err = rocksdb::Status::kOk;
     {
         _write_svc->batch_prepare(_decree);
 
@@ -104,7 +105,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
             // Make sure all writes are batched even if they are failed,
             // since we need to record the total qps and rpc latencies,
             // and respond for all RPCs regardless of their result.
-            int local_err = 0;
+            int local_err = rocksdb::Status::kOk;
             try {
                 dsn::task_code rpc_code(requests[i]->rpc_code());
                 if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
@@ -130,13 +131,14 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
                                  ex.what());
             }
 
-            if (!err && local_err) {
+            if (err == rocksdb::Status::kOk && local_err != rocksdb::Status::kOk) {
                 err = local_err;
             }
         }
 
-        if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) {
-            _write_svc->batch_abort(_decree, err == 0 ? -1 : err);
+        if (dsn_unlikely(err != rocksdb::Status::kOk ||
+                         (_put_rpc_batch.empty() && _remove_rpc_batch.empty()))) {
+            _write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 : err);
         } else {
             err = _write_svc->batch_commit(_decree);
         }
diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h
index 7bf35b347..6a002c299 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -52,7 +52,7 @@ public:
     /// Error returned is regarded as the failure of replica, thus will trigger
     /// cluster membership changes. Make sure no error is returned because of
     /// invalid user argument.
-    /// As long as the returned error is 0, the operation is guaranteed to be
+    /// As long as the returned error is rocksdb::Status::kOk, the operation is guaranteed to be
     /// successfully applied into rocksdb, which means an empty_put will be called
     /// even if there's no write.
     int on_batched_write_requests(dsn::message_ex **requests,
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index dd82e205d..73f6cc8d6 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -386,7 +386,7 @@ int pegasus_write_service::duplicate(int64_t decree,
         remove_rpc remove;
         if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
             request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
-            int err = 0;
+            int err = rocksdb::Status::kOk;
             if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
                 put = put_rpc(write);
                 err = _impl->batch_put(ctx, put.request(), put.response());
@@ -422,7 +422,7 @@ int pegasus_write_service::ingest_files(int64_t decree,
     resp.err = dsn::ERR_OK;
     // write empty put to flush decree
     resp.rocksdb_error = empty_put(decree);
-    if (resp.rocksdb_error != 0) {
+    if (resp.rocksdb_error != rocksdb::Status::kOk) {
         resp.err = dsn::ERR_TRY_AGAIN;
         return resp.rocksdb_error;
     }
diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h
index 4136f02b6..9fb854ffd 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -176,20 +176,20 @@ public:
     void batch_prepare(int64_t decree);
 
     // Add PUT record in batch write.
-    // \returns 0 if success, non-0 if failure.
+    // \returns rocksdb::Status::Code.
     // NOTE that `resp` should not be moved or freed while the batch is not committed.
     int batch_put(const db_write_context &ctx,
                   const dsn::apps::update_request &update,
                   dsn::apps::update_response &resp);
 
     // Add REMOVE record in batch write.
-    // \returns 0 if success, non-0 if failure.
+    // \returns rocksdb::Status::Code.
     // NOTE that `resp` should not be moved or freed while the batch is not committed.
     int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp);
 
     // Commit batch write.
-    // \returns 0 if success, non-0 if failure.
-    // NOTE that if the batch contains no updates, 0 is returned.
+    // \returns rocksdb::Status::Code.
+    // NOTE that if the batch contains no updates, rocksdb::Status::kOk is returned.
     int batch_commit(int64_t decree);
 
     // Abort batch write.
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index e0310d741..b75b379a3 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -36,6 +36,7 @@ namespace pegasus {
 namespace server {
 
 /// internal error codes used for fail injection
+// TODO(yingchun): Use real rocksdb::Status::code.
 static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
 static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
 static constexpr int FAIL_DB_WRITE = -103;
@@ -99,12 +100,11 @@ public:
         int err =
             _rocksdb_wrapper->write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0);
         auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
-        if (err) {
+        if (err != rocksdb::Status::kOk) {
             return err;
         }
 
-        err = _rocksdb_wrapper->write(decree);
-        return err;
+        return _rocksdb_wrapper->write(decree);
     }
 
     int multi_put(const db_write_context &ctx,
@@ -170,7 +170,7 @@ public:
         }
 
         resp.error = _rocksdb_wrapper->write(decree);
-        if (resp.error == 0) {
+        if (resp.error == rocksdb::Status::kOk) {
             resp.count = update.sort_keys.size();
         }
         return resp.error;
@@ -188,7 +188,7 @@ public:
         uint32_t new_expire_ts = 0;
         db_get_context get_ctx;
         int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
-        if (err != 0) {
+        if (err != rocksdb::Status::kOk) {
             resp.error = err;
             return err;
         }
@@ -252,7 +252,7 @@ public:
         }
 
         resp.error = _rocksdb_wrapper->write(decree);
-        if (resp.error == 0) {
+        if (resp.error == rocksdb::Status::kOk) {
             resp.new_value = new_value;
         }
         return resp.error;
@@ -283,7 +283,7 @@ public:
         db_get_context get_context;
         dsn::string_view check_raw_key(check_key.data(), check_key.length());
         int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
-        if (err != 0) {
+        if (err != rocksdb::Status::kOk) {
             // read check value failed
             LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, "
                               "check_sort_key: {}",
@@ -352,7 +352,7 @@ public:
                 invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
         }
 
-        return 0;
+        return rocksdb::Status::kOk;
     }
 
     int check_and_mutate(int64_t decree,
@@ -404,7 +404,7 @@ public:
         db_get_context get_context;
         dsn::string_view check_raw_key(check_key.data(), check_key.length());
         int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
-        if (err != 0) {
+        if (err != rocksdb::Status::kOk) {
             // read check value failed
             LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, "
                               "check_sort_key: {}",
@@ -475,7 +475,7 @@ public:
             resp.error =
                 invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
         }
-        return 0;
+        return rocksdb::Status::kOk;
     }
 
     // \return ERR_INVALID_VERSION: replay or commit out-date ingest request
@@ -508,7 +508,7 @@ public:
 
         // ingest external files
         if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) !=
-                         0)) {
+                         rocksdb::Status::kOk)) {
             return dsn::ERR_INGESTION_FAILED;
         }
         return dsn::ERR_OK;
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 574b00a84..c5872ec13 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -115,7 +115,7 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
 
         db_get_context get_ctx;
         int err = get(raw_key, &get_ctx);
-        if (dsn_unlikely(err != 0)) {
+        if (dsn_unlikely(err != rocksdb::Status::kOk)) {
             return err;
         }
         // if record exists and is not expired.
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index 05b73dc76..e3c713512 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -53,9 +53,10 @@ public:
     rocksdb_wrapper(pegasus_server_impl *server);
 
     /// Calls RocksDB Get and store the result into `db_get_context`.
-    /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
-    /// \result ctx.expired=true if record expired. Still 0 is returned.
-    /// \result ctx.found=false if record is not found. Still 0 is returned.
+    /// \returns rocksdb::Status::kOk if Get succeeded. On failure, a non-zero rocksdb status code
+    /// is returned.
+    /// \result ctx.expired=true if record expired. Still rocksdb::Status::kOk is returned.
+    /// \result ctx.found=false if record is not found. Still rocksdb::Status::kOk is returned.
     int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);
 
     int write_batch_put(int64_t decree,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org