You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by GitBox <gi...@apache.org> on 2022/02/14 05:47:33 UTC

[GitHub] [incubator-pegasus] acelyc111 commented on a change in pull request #897: feat: add 'BATCH_GET' interface for read optimization

acelyc111 commented on a change in pull request #897:
URL: https://github.com/apache/incubator-pegasus/pull/897#discussion_r805516379



##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,121 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
 }
 
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+    dassert(_is_open, "");
+    _pfc_batch_get_qps->increment();
+    int64_t start_time = dsn_now_ns();
+
+    auto &response = rpc.response();
+    response.app_id = _gpid.get_app_id();
+    response.partition_index = _gpid.get_partition_index();
+    response.server = _primary_address;
+
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
+    const auto &request = rpc.request();
+    if (request.keys.empty()) {
+        response.error = rocksdb::Status::kInvalidArgument;
+        derror_replica("Invalid argument for batch_get from {}: 'keys' field in request is empty",
+                       rpc.remote_address().to_string());
+        _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
+        _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+        return;
+    }
+
+    std::vector<rocksdb::Slice> keys;
+    keys.reserve(request.keys.size());
+    std::vector<::dsn::blob> keys_holder;
+    keys_holder.reserve(request.keys.size());
+    for (const auto &key : request.keys) {
+        ::dsn::blob raw_key;

Review comment:
       replace all `::dsn` to `dsn`

##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,121 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
 }
 
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+    dassert(_is_open, "");
+    _pfc_batch_get_qps->increment();
+    int64_t start_time = dsn_now_ns();
+
+    auto &response = rpc.response();
+    response.app_id = _gpid.get_app_id();
+    response.partition_index = _gpid.get_partition_index();
+    response.server = _primary_address;
+
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
+    const auto &request = rpc.request();
+    if (request.keys.empty()) {
+        response.error = rocksdb::Status::kInvalidArgument;
+        derror_replica("Invalid argument for batch_get from {}: 'keys' field in request is empty",
+                       rpc.remote_address().to_string());
+        _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
+        _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+        return;
+    }
+
+    std::vector<rocksdb::Slice> keys;
+    keys.reserve(request.keys.size());
+    std::vector<::dsn::blob> keys_holder;
+    keys_holder.reserve(request.keys.size());
+    for (const auto &key : request.keys) {
+        ::dsn::blob raw_key;
+        pegasus_generate_key(raw_key, key.hash_key, key.sort_key);
+        keys.emplace_back(rocksdb::Slice(raw_key.data(), raw_key.length()));
+        keys_holder.emplace_back(std::move(raw_key));
+    }
+
+    rocksdb::Status final_status;
+    bool error_occurred = false;
+    int64_t total_data_size = 0;
+    uint32_t epoch_now = pegasus::utils::epoch_now();
+    std::vector<std::string> values;
+    std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
+    response.data.reserve(request.keys.size());
+    for (int i = 0; i < keys.size(); i++) {
+        const auto &status = statuses[i];
+        const ::dsn::blob &hash_key = request.keys[i].hash_key;
+        const ::dsn::blob &sort_key = request.keys[i].sort_key;
+
+        if (status.IsNotFound()) {
+            continue;
+        }
+
+        std::string &value = values[i];
+
+        if (dsn_likely(status.ok())) {
+            if (check_if_record_expired(epoch_now, value)) {
+                if (_verbose_log) {
+                    derror_replica("rocksdb data expired for batch_get from {}",

Review comment:
       Now that logging here, you can explict logging which hash&sort key is expired. 

##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,121 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
 }
 
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+    dassert(_is_open, "");
+    _pfc_batch_get_qps->increment();
+    int64_t start_time = dsn_now_ns();
+
+    auto &response = rpc.response();
+    response.app_id = _gpid.get_app_id();
+    response.partition_index = _gpid.get_partition_index();
+    response.server = _primary_address;
+
+    if (!_read_size_throttling_controller->available()) {
+        rpc.error() = dsn::ERR_BUSY;
+        _counter_recent_read_throttling_reject_count->increment();
+        return;
+    }
+
+    const auto &request = rpc.request();
+    if (request.keys.empty()) {
+        response.error = rocksdb::Status::kInvalidArgument;
+        derror_replica("Invalid argument for batch_get from {}: 'keys' field in request is empty",
+                       rpc.remote_address().to_string());
+        _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
+        _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+        return;
+    }
+
+    std::vector<rocksdb::Slice> keys;
+    keys.reserve(request.keys.size());
+    std::vector<::dsn::blob> keys_holder;
+    keys_holder.reserve(request.keys.size());
+    for (const auto &key : request.keys) {
+        ::dsn::blob raw_key;
+        pegasus_generate_key(raw_key, key.hash_key, key.sort_key);
+        keys.emplace_back(rocksdb::Slice(raw_key.data(), raw_key.length()));
+        keys_holder.emplace_back(std::move(raw_key));
+    }
+
+    rocksdb::Status final_status;
+    bool error_occurred = false;
+    int64_t total_data_size = 0;
+    uint32_t epoch_now = pegasus::utils::epoch_now();
+    std::vector<std::string> values;
+    std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
+    response.data.reserve(request.keys.size());
+    for (int i = 0; i < keys.size(); i++) {
+        const auto &status = statuses[i];
+        const ::dsn::blob &hash_key = request.keys[i].hash_key;
+        const ::dsn::blob &sort_key = request.keys[i].sort_key;
+
+        if (status.IsNotFound()) {
+            continue;
+        }
+
+        std::string &value = values[i];
+
+        if (dsn_likely(status.ok())) {
+            if (check_if_record_expired(epoch_now, value)) {
+                if (_verbose_log) {
+                    derror_replica("rocksdb data expired for batch_get from {}",
+                                   rpc.remote_address().to_string());
+                }
+                continue;
+            }
+
+            dsn::blob real_value;
+            pegasus_extract_user_data(_pegasus_data_version, std::move(value), real_value);
+            dsn::apps::full_data current_data;
+            current_data.hash_key = hash_key;
+            current_data.sort_key = sort_key;
+            current_data.value = std::move(real_value);
+            total_data_size += current_data.value.size();
+            response.data.emplace_back(std::move(current_data));
+        } else {
+            if (_verbose_log) {
+                derror_replica(
+                    "rocksdb get failed for batch_get from {}:  error = {}, key size = {}",
+                    replica_name(),

Review comment:
       You can remove this line, `derror_replica` contains the replica name implictly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org