You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/03/31 07:05:08 UTC

[incubator-pegasus] branch master updated: feat(split): scan support validate_partition_hash (#702)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e35518  feat(split): scan support validate_partition_hash (#702)
8e35518 is described below

commit 8e3551816882860e5bb951650951690d25fcb88b
Author: HeYuchen <37...@qq.com>
AuthorDate: Wed Mar 31 15:05:01 2021 +0800

    feat(split): scan support validate_partition_hash (#702)
---
 src/base/rrdb_types.cpp                 |  48 ++++----
 src/client_lib/pegasus_client_impl.cpp  |   4 +-
 src/client_lib/pegasus_client_impl.h    |   7 +-
 src/client_lib/pegasus_scanner_impl.cpp |  12 +-
 src/idl/rrdb.thrift                     |   1 +
 src/include/rrdb/rrdb_types.h           |  34 +++---
 src/server/pegasus_scan_context.h       |   7 +-
 src/server/pegasus_server_impl.cpp      | 189 +++++++++++++++++++-------------
 src/server/pegasus_server_impl.h        |  51 +++++----
 9 files changed, 206 insertions(+), 147 deletions(-)

diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 9e48521..42c6778 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -1,22 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 /**
  * Autogenerated by Thrift Compiler (0.9.3)
  *
@@ -3692,6 +3673,12 @@ void get_scanner_request::__set_sort_key_filter_pattern(const ::dsn::blob &val)
     this->sort_key_filter_pattern = val;
 }
 
+void get_scanner_request::__set_validate_partition_hash(const bool val)
+{
+    this->validate_partition_hash = val;
+    __isset.validate_partition_hash = true;
+}
+
 uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
 {
 
@@ -3795,6 +3782,14 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
                 xfer += iprot->skip(ftype);
             }
             break;
+        case 11:
+            if (ftype == ::apache::thrift::protocol::T_BOOL) {
+                xfer += iprot->readBool(this->validate_partition_hash);
+                this->__isset.validate_partition_hash = true;
+            } else {
+                xfer += iprot->skip(ftype);
+            }
+            break;
         default:
             xfer += iprot->skip(ftype);
             break;
@@ -3855,6 +3850,12 @@ uint32_t get_scanner_request::write(::apache::thrift::protocol::TProtocol *oprot
     xfer += this->sort_key_filter_pattern.write(oprot);
     xfer += oprot->writeFieldEnd();
 
+    if (this->__isset.validate_partition_hash) {
+        xfer += oprot->writeFieldBegin(
+            "validate_partition_hash", ::apache::thrift::protocol::T_BOOL, 11);
+        xfer += oprot->writeBool(this->validate_partition_hash);
+        xfer += oprot->writeFieldEnd();
+    }
     xfer += oprot->writeFieldStop();
     xfer += oprot->writeStructEnd();
     return xfer;
@@ -3873,6 +3874,7 @@ void swap(get_scanner_request &a, get_scanner_request &b)
     swap(a.hash_key_filter_pattern, b.hash_key_filter_pattern);
     swap(a.sort_key_filter_type, b.sort_key_filter_type);
     swap(a.sort_key_filter_pattern, b.sort_key_filter_pattern);
+    swap(a.validate_partition_hash, b.validate_partition_hash);
     swap(a.__isset, b.__isset);
 }
 
@@ -3888,6 +3890,7 @@ get_scanner_request::get_scanner_request(const get_scanner_request &other108)
     hash_key_filter_pattern = other108.hash_key_filter_pattern;
     sort_key_filter_type = other108.sort_key_filter_type;
     sort_key_filter_pattern = other108.sort_key_filter_pattern;
+    validate_partition_hash = other108.validate_partition_hash;
     __isset = other108.__isset;
 }
 get_scanner_request::get_scanner_request(get_scanner_request &&other109)
@@ -3902,6 +3905,7 @@ get_scanner_request::get_scanner_request(get_scanner_request &&other109)
     hash_key_filter_pattern = std::move(other109.hash_key_filter_pattern);
     sort_key_filter_type = std::move(other109.sort_key_filter_type);
     sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern);
+    validate_partition_hash = std::move(other109.validate_partition_hash);
     __isset = std::move(other109.__isset);
 }
 get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110)
@@ -3916,6 +3920,7 @@ get_scanner_request &get_scanner_request::operator=(const get_scanner_request &o
     hash_key_filter_pattern = other110.hash_key_filter_pattern;
     sort_key_filter_type = other110.sort_key_filter_type;
     sort_key_filter_pattern = other110.sort_key_filter_pattern;
+    validate_partition_hash = other110.validate_partition_hash;
     __isset = other110.__isset;
     return *this;
 }
@@ -3931,6 +3936,7 @@ get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other1
     hash_key_filter_pattern = std::move(other111.hash_key_filter_pattern);
     sort_key_filter_type = std::move(other111.sort_key_filter_type);
     sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern);
+    validate_partition_hash = std::move(other111.validate_partition_hash);
     __isset = std::move(other111.__isset);
     return *this;
 }
@@ -3957,6 +3963,10 @@ void get_scanner_request::printTo(std::ostream &out) const
         << "sort_key_filter_type=" << to_string(sort_key_filter_type);
     out << ", "
         << "sort_key_filter_pattern=" << to_string(sort_key_filter_pattern);
+    out << ", "
+        << "validate_partition_hash=";
+    (__isset.validate_partition_hash ? (out << to_string(validate_partition_hash))
+                                     : (out << "<null>"));
     out << ")";
 }
 
diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp
index 3e76b99..b0d7f39 100644
--- a/src/client_lib/pegasus_client_impl.cpp
+++ b/src/client_lib/pegasus_client_impl.cpp
@@ -1179,7 +1179,7 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key,
     if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) {
         v.push_back(pegasus_key_hash(start));
     }
-    scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop);
+    scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false);
 
     return PERR_OK;
 }
@@ -1223,7 +1223,7 @@ void pegasus_client_impl::async_get_unordered_scanners(
                     std::vector<uint64_t> hash(s);
                     for (int j = 0; j < s; j++)
                         hash[j] = --count;
-                    scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options);
+                    scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options, true);
                 }
             }
         }
diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h
index 77f53a9..be33e88 100644
--- a/src/client_lib/pegasus_client_impl.h
+++ b/src/client_lib/pegasus_client_impl.h
@@ -265,12 +265,14 @@ public:
 
         pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
                              std::vector<uint64_t> &&hash,
-                             const scan_options &options);
+                             const scan_options &options,
+                             bool validate_partition_hash);
         pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
                              std::vector<uint64_t> &&hash,
                              const scan_options &options,
                              const ::dsn::blob &start_key,
-                             const ::dsn::blob &stop_key);
+                             const ::dsn::blob &stop_key,
+                             bool validate_partition_hash);
 
     private:
         ::dsn::apps::rrdb_client *_client;
@@ -288,6 +290,7 @@ public:
         mutable ::dsn::zlock _lock;
         std::list<async_scan_next_callback_t> _queue;
         volatile bool _rpc_started;
+        bool _validate_partition_hash;
 
         void _async_next_internal();
         void _start_scan();
diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp
index 928a932..d264c3a 100644
--- a/src/client_lib/pegasus_scanner_impl.cpp
+++ b/src/client_lib/pegasus_scanner_impl.cpp
@@ -28,8 +28,9 @@ namespace client {
 
 pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
                                                                 std::vector<uint64_t> &&hash,
-                                                                const scan_options &options)
-    : pegasus_scanner_impl(client, std::move(hash), options, _min, _max)
+                                                                const scan_options &options,
+                                                                bool validate_partition_hash)
+    : pegasus_scanner_impl(client, std::move(hash), options, _min, _max, validate_partition_hash)
 {
     _options.start_inclusive = true;
     _options.stop_inclusive = false;
@@ -39,7 +40,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
                                                                 std::vector<uint64_t> &&hash,
                                                                 const scan_options &options,
                                                                 const ::dsn::blob &start_key,
-                                                                const ::dsn::blob &stop_key)
+                                                                const ::dsn::blob &stop_key,
+                                                                bool validate_partition_hash)
     : _client(client),
       _start_key(start_key),
       _stop_key(stop_key),
@@ -47,7 +49,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
       _splits_hash(std::move(hash)),
       _p(-1),
       _context(SCAN_CONTEXT_ID_COMPLETED),
-      _rpc_started(false)
+      _rpc_started(false),
+      _validate_partition_hash(validate_partition_hash)
 {
 }
 
@@ -211,6 +214,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
     req.sort_key_filter_pattern = ::dsn::blob(
         _options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size());
     req.no_value = _options.no_value;
+    req.__set_validate_partition_hash(_validate_partition_hash);
 
     dassert(!_rpc_started, "");
     _rpc_started = true;
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 02207a9..2b12273 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -252,6 +252,7 @@ struct get_scanner_request
     8:dsn.blob     hash_key_filter_pattern;
     9:filter_type  sort_key_filter_type;
     10:dsn.blob    sort_key_filter_pattern;
+    11:optional bool    validate_partition_hash;
 }
 
 struct scan_request
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index a9bb3b8..a9c512e 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -1,22 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 /**
  * Autogenerated by Thrift Compiler (0.9.3)
  *
@@ -1577,7 +1558,8 @@ typedef struct _get_scanner_request__isset
           hash_key_filter_type(false),
           hash_key_filter_pattern(false),
           sort_key_filter_type(false),
-          sort_key_filter_pattern(false)
+          sort_key_filter_pattern(false),
+          validate_partition_hash(false)
     {
     }
     bool start_key : 1;
@@ -1590,6 +1572,7 @@ typedef struct _get_scanner_request__isset
     bool hash_key_filter_pattern : 1;
     bool sort_key_filter_type : 1;
     bool sort_key_filter_pattern : 1;
+    bool validate_partition_hash : 1;
 } _get_scanner_request__isset;
 
 class get_scanner_request
@@ -1605,7 +1588,8 @@ public:
           batch_size(0),
           no_value(0),
           hash_key_filter_type((filter_type::type)0),
-          sort_key_filter_type((filter_type::type)0)
+          sort_key_filter_type((filter_type::type)0),
+          validate_partition_hash(0)
     {
     }
 
@@ -1620,6 +1604,7 @@ public:
     ::dsn::blob hash_key_filter_pattern;
     filter_type::type sort_key_filter_type;
     ::dsn::blob sort_key_filter_pattern;
+    bool validate_partition_hash;
 
     _get_scanner_request__isset __isset;
 
@@ -1643,6 +1628,8 @@ public:
 
     void __set_sort_key_filter_pattern(const ::dsn::blob &val);
 
+    void __set_validate_partition_hash(const bool val);
+
     bool operator==(const get_scanner_request &rhs) const
     {
         if (!(start_key == rhs.start_key))
@@ -1665,6 +1652,11 @@ public:
             return false;
         if (!(sort_key_filter_pattern == rhs.sort_key_filter_pattern))
             return false;
+        if (__isset.validate_partition_hash != rhs.__isset.validate_partition_hash)
+            return false;
+        else if (__isset.validate_partition_hash &&
+                 !(validate_partition_hash == rhs.validate_partition_hash))
+            return false;
         return true;
     }
     bool operator!=(const get_scanner_request &rhs) const { return !(*this == rhs); }
diff --git a/src/server/pegasus_scan_context.h b/src/server/pegasus_scan_context.h
index a9c24fb..da3cd08 100644
--- a/src/server/pegasus_scan_context.h
+++ b/src/server/pegasus_scan_context.h
@@ -41,7 +41,8 @@ struct pegasus_scan_context
                          ::dsn::apps::filter_type::type sort_key_filter_type_,
                          const std::string &&sort_key_filter_pattern_,
                          int32_t batch_size_,
-                         bool no_value_)
+                         bool no_value_,
+                         bool validate_partition_hash_)
         : _stop_holder(std::move(stop_)),
           _hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
           _sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
@@ -55,7 +56,8 @@ struct pegasus_scan_context
           sort_key_filter_pattern(
               _sort_key_filter_pattern_holder.data(), 0, _sort_key_filter_pattern_holder.length()),
           batch_size(batch_size_),
-          no_value(no_value_)
+          no_value(no_value_),
+          validate_partition_hash(validate_partition_hash_)
     {
     }
 
@@ -74,6 +76,7 @@ public:
     dsn::blob sort_key_filter_pattern;
     int32_t batch_size;
     bool no_value;
+    bool validate_partition_hash;
 };
 
 class pegasus_context_cache
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 7816bcd..4f9fbe5 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -483,23 +483,30 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
                 limiter->add_count();
 
                 // extract value
-                int r = append_key_value_for_multi_get(resp.kvs,
-                                                       it->key(),
-                                                       it->value(),
-                                                       request.sort_key_filter_type,
-                                                       request.sort_key_filter_pattern,
-                                                       epoch_now,
-                                                       request.no_value);
-                if (r == 1) {
+                auto state = append_key_value_for_multi_get(resp.kvs,
+                                                            it->key(),
+                                                            it->value(),
+                                                            request.sort_key_filter_type,
+                                                            request.sort_key_filter_pattern,
+                                                            epoch_now,
+                                                            request.no_value);
+
+                switch (state) {
+                case range_iteration_state::kNormal: {
                     count++;
                     auto &kv = resp.kvs.back();
                     uint64_t kv_size = kv.key.length() + kv.value.length();
                     size += kv_size;
                     limiter->add_size(kv_size);
-                } else if (r == 2) {
+                } break;
+                case range_iteration_state::kExpired:
                     expire_count++;
-                } else { // r == 3
+                    break;
+                case range_iteration_state::kFiltered:
                     filter_count++;
+                    break;
+                default:
+                    break;
                 }
 
                 if (c == 0) {
@@ -548,23 +555,29 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
                 limiter->add_count();
 
                 // extract value
-                int r = append_key_value_for_multi_get(reverse_kvs,
-                                                       it->key(),
-                                                       it->value(),
-                                                       request.sort_key_filter_type,
-                                                       request.sort_key_filter_pattern,
-                                                       epoch_now,
-                                                       request.no_value);
-                if (r == 1) {
+                auto state = append_key_value_for_multi_get(reverse_kvs,
+                                                            it->key(),
+                                                            it->value(),
+                                                            request.sort_key_filter_type,
+                                                            request.sort_key_filter_pattern,
+                                                            epoch_now,
+                                                            request.no_value);
+                switch (state) {
+                case range_iteration_state::kNormal: {
                     count++;
                     auto &kv = reverse_kvs.back();
                     uint64_t kv_size = kv.key.length() + kv.value.length();
                     size += kv_size;
                     limiter->add_size(kv_size);
-                } else if (r == 2) {
+                } break;
+                case range_iteration_state::kExpired:
                     expire_count++;
-                } else { // r == 3
+                    break;
+                case range_iteration_state::kFiltered:
                     filter_count++;
+                    break;
+                default:
+                    break;
                 }
 
                 if (c == 0) {
@@ -1016,21 +1029,29 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
 
         limiter->add_count();
 
-        int r = append_key_value_for_scan(resp.kvs,
-                                          it->key(),
-                                          it->value(),
-                                          request.hash_key_filter_type,
-                                          request.hash_key_filter_pattern,
-                                          request.sort_key_filter_type,
-                                          request.sort_key_filter_pattern,
-                                          epoch_now,
-                                          request.no_value);
-        if (r == 1) {
+        auto state = append_key_value_for_scan(
+            resp.kvs,
+            it->key(),
+            it->value(),
+            request.hash_key_filter_type,
+            request.hash_key_filter_pattern,
+            request.sort_key_filter_type,
+            request.sort_key_filter_pattern,
+            epoch_now,
+            request.no_value,
+            request.__isset.validate_partition_hash ? request.validate_partition_hash : true);
+        switch (state) {
+        case range_iteration_state::kNormal:
             count++;
-        } else if (r == 2) {
+            break;
+        case range_iteration_state::kExpired:
             expire_count++;
-        } else { // r == 3
+            break;
+        case range_iteration_state::kFiltered:
             filter_count++;
+            break;
+        default:
+            break;
         }
 
         if (c == 0) {
@@ -1081,18 +1102,19 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
                       limiter->max_duration_time());
     } else if (it->Valid() && !complete) {
         // scan not completed
-        std::unique_ptr<pegasus_scan_context> context(
-            new pegasus_scan_context(std::move(it),
-                                     std::string(stop.data(), stop.size()),
-                                     request.stop_inclusive,
-                                     request.hash_key_filter_type,
-                                     std::string(request.hash_key_filter_pattern.data(),
-                                                 request.hash_key_filter_pattern.length()),
-                                     request.sort_key_filter_type,
-                                     std::string(request.sort_key_filter_pattern.data(),
-                                                 request.sort_key_filter_pattern.length()),
-                                     batch_count,
-                                     request.no_value));
+        std::unique_ptr<pegasus_scan_context> context(new pegasus_scan_context(
+            std::move(it),
+            std::string(stop.data(), stop.size()),
+            request.stop_inclusive,
+            request.hash_key_filter_type,
+            std::string(request.hash_key_filter_pattern.data(),
+                        request.hash_key_filter_pattern.length()),
+            request.sort_key_filter_type,
+            std::string(request.sort_key_filter_pattern.data(),
+                        request.sort_key_filter_pattern.length()),
+            batch_count,
+            request.no_value,
+            request.__isset.validate_partition_hash ? request.validate_partition_hash : true));
         int64_t handle = _context_cache.put(std::move(context));
         resp.context_id = handle;
         // if the context is used, it will be fetched and re-put into cache,
@@ -1140,6 +1162,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
         ::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
         const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
         bool no_value = context->no_value;
+        bool validate_hash = context->validate_partition_hash;
         bool complete = false;
         uint32_t epoch_now = ::pegasus::utils::epoch_now();
         uint64_t expire_count = 0;
@@ -1163,21 +1186,28 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
 
             limiter->add_count();
 
-            int r = append_key_value_for_scan(resp.kvs,
-                                              it->key(),
-                                              it->value(),
-                                              hash_key_filter_type,
-                                              hash_key_filter_pattern,
-                                              sort_key_filter_type,
-                                              sort_key_filter_pattern,
-                                              epoch_now,
-                                              no_value);
-            if (r == 1) {
+            auto state = append_key_value_for_scan(resp.kvs,
+                                                   it->key(),
+                                                   it->value(),
+                                                   hash_key_filter_type,
+                                                   hash_key_filter_pattern,
+                                                   sort_key_filter_type,
+                                                   sort_key_filter_pattern,
+                                                   epoch_now,
+                                                   no_value,
+                                                   validate_hash);
+            switch (state) {
+            case range_iteration_state::kNormal:
                 count++;
-            } else if (r == 2) {
+                break;
+            case range_iteration_state::kExpired:
                 expire_count++;
-            } else { // r == 3
+                break;
+            case range_iteration_state::kFiltered:
                 filter_count++;
+                break;
+            default:
+                break;
             }
 
             if (c == 0) {
@@ -2046,22 +2076,33 @@ bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_
     return false;
 }
 
-int pegasus_server_impl::append_key_value_for_scan(
-    std::vector<::dsn::apps::key_value> &kvs,
-    const rocksdb::Slice &key,
-    const rocksdb::Slice &value,
-    ::dsn::apps::filter_type::type hash_key_filter_type,
-    const ::dsn::blob &hash_key_filter_pattern,
-    ::dsn::apps::filter_type::type sort_key_filter_type,
-    const ::dsn::blob &sort_key_filter_pattern,
-    uint32_t epoch_now,
-    bool no_value)
+range_iteration_state
+pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
+                                               const rocksdb::Slice &key,
+                                               const rocksdb::Slice &value,
+                                               ::dsn::apps::filter_type::type hash_key_filter_type,
+                                               const ::dsn::blob &hash_key_filter_pattern,
+                                               ::dsn::apps::filter_type::type sort_key_filter_type,
+                                               const ::dsn::blob &sort_key_filter_pattern,
+                                               uint32_t epoch_now,
+                                               bool no_value,
+                                               bool request_validate_hash)
 {
     if (check_if_record_expired(epoch_now, value)) {
         if (_verbose_log) {
             derror("%s: rocksdb data expired for scan", replica_name());
         }
-        return 2;
+        return range_iteration_state::kExpired;
+    }
+
+    if (request_validate_hash && _validate_partition_hash) {
+        if (_partition_version < 0 || _gpid.get_partition_index() > _partition_version ||
+            !check_pegasus_key_hash(key, _gpid.get_partition_index(), _partition_version)) {
+            if (_verbose_log) {
+                derror_replica("not serve hash key while scan");
+            }
+            return range_iteration_state::kHashInvalid;
+        }
     }
 
     ::dsn::apps::key_value kv;
@@ -2077,14 +2118,14 @@ int pegasus_server_impl::append_key_value_for_scan(
             if (_verbose_log) {
                 derror("%s: hash key filtered for scan", replica_name());
             }
-            return 3;
+            return range_iteration_state::kFiltered;
         }
         if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
             !validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
             if (_verbose_log) {
                 derror("%s: sort key filtered for scan", replica_name());
             }
-            return 3;
+            return range_iteration_state::kFiltered;
         }
     }
     std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
@@ -2098,10 +2139,10 @@ int pegasus_server_impl::append_key_value_for_scan(
     }
 
     kvs.emplace_back(std::move(kv));
-    return 1;
+    return range_iteration_state::kNormal;
 }
 
-int pegasus_server_impl::append_key_value_for_multi_get(
+range_iteration_state pegasus_server_impl::append_key_value_for_multi_get(
     std::vector<::dsn::apps::key_value> &kvs,
     const rocksdb::Slice &key,
     const rocksdb::Slice &value,
@@ -2114,7 +2155,7 @@ int pegasus_server_impl::append_key_value_for_multi_get(
         if (_verbose_log) {
             derror("%s: rocksdb data expired for multi get", replica_name());
         }
-        return 2;
+        return range_iteration_state::kExpired;
     }
 
     ::dsn::apps::key_value kv;
@@ -2128,7 +2169,7 @@ int pegasus_server_impl::append_key_value_for_multi_get(
         if (_verbose_log) {
             derror("%s: sort key filtered for multi get", replica_name());
         }
-        return 3;
+        return range_iteration_state::kFiltered;
     }
     std::shared_ptr<char> sort_key_buf(::dsn::utils::make_shared_array<char>(sort_key.length()));
     ::memcpy(sort_key_buf.get(), sort_key.data(), sort_key.length());
@@ -2141,7 +2182,7 @@ int pegasus_server_impl::append_key_value_for_multi_get(
     }
 
     kvs.emplace_back(std::move(kv));
-    return 1;
+    return range_iteration_state::kNormal;
 }
 
 void pegasus_server_impl::update_replica_rocksdb_statistics()
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 2b314c3..4a26577 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -45,6 +45,14 @@ class capacity_unit_calculator;
 class pegasus_server_write;
 class hotkey_collector;
 
+enum class range_iteration_state
+{
+    kNormal = 1,
+    kExpired,
+    kFiltered,
+    kHashInvalid
+};
+
 class pegasus_server_impl : public pegasus_read_service
 {
 public:
@@ -203,29 +211,26 @@ private:
 
     void set_last_durable_decree(int64_t decree) { _last_durable_decree.store(decree); }
 
-    // return 1 if value is appended
-    // return 2 if value is expired
-    // return 3 if value is filtered
-    int append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
-                                  const rocksdb::Slice &key,
-                                  const rocksdb::Slice &value,
-                                  ::dsn::apps::filter_type::type hash_key_filter_type,
-                                  const ::dsn::blob &hash_key_filter_pattern,
-                                  ::dsn::apps::filter_type::type sort_key_filter_type,
-                                  const ::dsn::blob &sort_key_filter_pattern,
-                                  uint32_t epoch_now,
-                                  bool no_value);
-
-    // return 1 if value is appended
-    // return 2 if value is expired
-    // return 3 if value is filtered
-    int append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
-                                       const rocksdb::Slice &key,
-                                       const rocksdb::Slice &value,
-                                       ::dsn::apps::filter_type::type sort_key_filter_type,
-                                       const ::dsn::blob &sort_key_filter_pattern,
-                                       uint32_t epoch_now,
-                                       bool no_value);
+    range_iteration_state
+    append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
+                              const rocksdb::Slice &key,
+                              const rocksdb::Slice &value,
+                              ::dsn::apps::filter_type::type hash_key_filter_type,
+                              const ::dsn::blob &hash_key_filter_pattern,
+                              ::dsn::apps::filter_type::type sort_key_filter_type,
+                              const ::dsn::blob &sort_key_filter_pattern,
+                              uint32_t epoch_now,
+                              bool no_value,
+                              bool request_validate_hash);
+
+    range_iteration_state
+    append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
+                                   const rocksdb::Slice &key,
+                                   const rocksdb::Slice &value,
+                                   ::dsn::apps::filter_type::type sort_key_filter_type,
+                                   const ::dsn::blob &sort_key_filter_pattern,
+                                   uint32_t epoch_now,
+                                   bool no_value);
 
     // return true if the filter type is supported
     bool is_filter_type_supported(::dsn::apps::filter_type::type filter_type)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org