You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/03/31 07:05:08 UTC
[incubator-pegasus] branch master updated: feat(split): scan
support validate_partition_hash (#702)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 8e35518 feat(split): scan support validate_partition_hash (#702)
8e35518 is described below
commit 8e3551816882860e5bb951650951690d25fcb88b
Author: HeYuchen <37...@qq.com>
AuthorDate: Wed Mar 31 15:05:01 2021 +0800
feat(split): scan support validate_partition_hash (#702)
---
src/base/rrdb_types.cpp | 48 ++++----
src/client_lib/pegasus_client_impl.cpp | 4 +-
src/client_lib/pegasus_client_impl.h | 7 +-
src/client_lib/pegasus_scanner_impl.cpp | 12 +-
src/idl/rrdb.thrift | 1 +
src/include/rrdb/rrdb_types.h | 34 +++---
src/server/pegasus_scan_context.h | 7 +-
src/server/pegasus_server_impl.cpp | 189 +++++++++++++++++++-------------
src/server/pegasus_server_impl.h | 51 +++++----
9 files changed, 206 insertions(+), 147 deletions(-)
diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 9e48521..42c6778 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -1,22 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
/**
* Autogenerated by Thrift Compiler (0.9.3)
*
@@ -3692,6 +3673,12 @@ void get_scanner_request::__set_sort_key_filter_pattern(const ::dsn::blob &val)
this->sort_key_filter_pattern = val;
}
+void get_scanner_request::__set_validate_partition_hash(const bool val)
+{
+ this->validate_partition_hash = val;
+ __isset.validate_partition_hash = true;
+}
+
uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
{
@@ -3795,6 +3782,14 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
xfer += iprot->skip(ftype);
}
break;
+ case 11:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->validate_partition_hash);
+ this->__isset.validate_partition_hash = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -3855,6 +3850,12 @@ uint32_t get_scanner_request::write(::apache::thrift::protocol::TProtocol *oprot
xfer += this->sort_key_filter_pattern.write(oprot);
xfer += oprot->writeFieldEnd();
+ if (this->__isset.validate_partition_hash) {
+ xfer += oprot->writeFieldBegin(
+ "validate_partition_hash", ::apache::thrift::protocol::T_BOOL, 11);
+ xfer += oprot->writeBool(this->validate_partition_hash);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -3873,6 +3874,7 @@ void swap(get_scanner_request &a, get_scanner_request &b)
swap(a.hash_key_filter_pattern, b.hash_key_filter_pattern);
swap(a.sort_key_filter_type, b.sort_key_filter_type);
swap(a.sort_key_filter_pattern, b.sort_key_filter_pattern);
+ swap(a.validate_partition_hash, b.validate_partition_hash);
swap(a.__isset, b.__isset);
}
@@ -3888,6 +3890,7 @@ get_scanner_request::get_scanner_request(const get_scanner_request &other108)
hash_key_filter_pattern = other108.hash_key_filter_pattern;
sort_key_filter_type = other108.sort_key_filter_type;
sort_key_filter_pattern = other108.sort_key_filter_pattern;
+ validate_partition_hash = other108.validate_partition_hash;
__isset = other108.__isset;
}
get_scanner_request::get_scanner_request(get_scanner_request &&other109)
@@ -3902,6 +3905,7 @@ get_scanner_request::get_scanner_request(get_scanner_request &&other109)
hash_key_filter_pattern = std::move(other109.hash_key_filter_pattern);
sort_key_filter_type = std::move(other109.sort_key_filter_type);
sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern);
+ validate_partition_hash = std::move(other109.validate_partition_hash);
__isset = std::move(other109.__isset);
}
get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110)
@@ -3916,6 +3920,7 @@ get_scanner_request &get_scanner_request::operator=(const get_scanner_request &o
hash_key_filter_pattern = other110.hash_key_filter_pattern;
sort_key_filter_type = other110.sort_key_filter_type;
sort_key_filter_pattern = other110.sort_key_filter_pattern;
+ validate_partition_hash = other110.validate_partition_hash;
__isset = other110.__isset;
return *this;
}
@@ -3931,6 +3936,7 @@ get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other1
hash_key_filter_pattern = std::move(other111.hash_key_filter_pattern);
sort_key_filter_type = std::move(other111.sort_key_filter_type);
sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern);
+ validate_partition_hash = std::move(other111.validate_partition_hash);
__isset = std::move(other111.__isset);
return *this;
}
@@ -3957,6 +3963,10 @@ void get_scanner_request::printTo(std::ostream &out) const
<< "sort_key_filter_type=" << to_string(sort_key_filter_type);
out << ", "
<< "sort_key_filter_pattern=" << to_string(sort_key_filter_pattern);
+ out << ", "
+ << "validate_partition_hash=";
+ (__isset.validate_partition_hash ? (out << to_string(validate_partition_hash))
+ : (out << "<null>"));
out << ")";
}
diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp
index 3e76b99..b0d7f39 100644
--- a/src/client_lib/pegasus_client_impl.cpp
+++ b/src/client_lib/pegasus_client_impl.cpp
@@ -1179,7 +1179,7 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key,
if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) {
v.push_back(pegasus_key_hash(start));
}
- scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop);
+ scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false);
return PERR_OK;
}
@@ -1223,7 +1223,7 @@ void pegasus_client_impl::async_get_unordered_scanners(
std::vector<uint64_t> hash(s);
for (int j = 0; j < s; j++)
hash[j] = --count;
- scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options);
+ scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options, true);
}
}
}
diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h
index 77f53a9..be33e88 100644
--- a/src/client_lib/pegasus_client_impl.h
+++ b/src/client_lib/pegasus_client_impl.h
@@ -265,12 +265,14 @@ public:
pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
- const scan_options &options);
+ const scan_options &options,
+ bool validate_partition_hash);
pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
const scan_options &options,
const ::dsn::blob &start_key,
- const ::dsn::blob &stop_key);
+ const ::dsn::blob &stop_key,
+ bool validate_partition_hash);
private:
::dsn::apps::rrdb_client *_client;
@@ -288,6 +290,7 @@ public:
mutable ::dsn::zlock _lock;
std::list<async_scan_next_callback_t> _queue;
volatile bool _rpc_started;
+ bool _validate_partition_hash;
void _async_next_internal();
void _start_scan();
diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp
index 928a932..d264c3a 100644
--- a/src/client_lib/pegasus_scanner_impl.cpp
+++ b/src/client_lib/pegasus_scanner_impl.cpp
@@ -28,8 +28,9 @@ namespace client {
pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
- const scan_options &options)
- : pegasus_scanner_impl(client, std::move(hash), options, _min, _max)
+ const scan_options &options,
+ bool validate_partition_hash)
+ : pegasus_scanner_impl(client, std::move(hash), options, _min, _max, validate_partition_hash)
{
_options.start_inclusive = true;
_options.stop_inclusive = false;
@@ -39,7 +40,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
std::vector<uint64_t> &&hash,
const scan_options &options,
const ::dsn::blob &start_key,
- const ::dsn::blob &stop_key)
+ const ::dsn::blob &stop_key,
+ bool validate_partition_hash)
: _client(client),
_start_key(start_key),
_stop_key(stop_key),
@@ -47,7 +49,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_splits_hash(std::move(hash)),
_p(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
- _rpc_started(false)
+ _rpc_started(false),
+ _validate_partition_hash(validate_partition_hash)
{
}
@@ -211,6 +214,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.sort_key_filter_pattern = ::dsn::blob(
_options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size());
req.no_value = _options.no_value;
+ req.__set_validate_partition_hash(_validate_partition_hash);
dassert(!_rpc_started, "");
_rpc_started = true;
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 02207a9..2b12273 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -252,6 +252,7 @@ struct get_scanner_request
8:dsn.blob hash_key_filter_pattern;
9:filter_type sort_key_filter_type;
10:dsn.blob sort_key_filter_pattern;
+ 11:optional bool validate_partition_hash;
}
struct scan_request
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index a9bb3b8..a9c512e 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -1,22 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
/**
* Autogenerated by Thrift Compiler (0.9.3)
*
@@ -1577,7 +1558,8 @@ typedef struct _get_scanner_request__isset
hash_key_filter_type(false),
hash_key_filter_pattern(false),
sort_key_filter_type(false),
- sort_key_filter_pattern(false)
+ sort_key_filter_pattern(false),
+ validate_partition_hash(false)
{
}
bool start_key : 1;
@@ -1590,6 +1572,7 @@ typedef struct _get_scanner_request__isset
bool hash_key_filter_pattern : 1;
bool sort_key_filter_type : 1;
bool sort_key_filter_pattern : 1;
+ bool validate_partition_hash : 1;
} _get_scanner_request__isset;
class get_scanner_request
@@ -1605,7 +1588,8 @@ public:
batch_size(0),
no_value(0),
hash_key_filter_type((filter_type::type)0),
- sort_key_filter_type((filter_type::type)0)
+ sort_key_filter_type((filter_type::type)0),
+ validate_partition_hash(0)
{
}
@@ -1620,6 +1604,7 @@ public:
::dsn::blob hash_key_filter_pattern;
filter_type::type sort_key_filter_type;
::dsn::blob sort_key_filter_pattern;
+ bool validate_partition_hash;
_get_scanner_request__isset __isset;
@@ -1643,6 +1628,8 @@ public:
void __set_sort_key_filter_pattern(const ::dsn::blob &val);
+ void __set_validate_partition_hash(const bool val);
+
bool operator==(const get_scanner_request &rhs) const
{
if (!(start_key == rhs.start_key))
@@ -1665,6 +1652,11 @@ public:
return false;
if (!(sort_key_filter_pattern == rhs.sort_key_filter_pattern))
return false;
+ if (__isset.validate_partition_hash != rhs.__isset.validate_partition_hash)
+ return false;
+ else if (__isset.validate_partition_hash &&
+ !(validate_partition_hash == rhs.validate_partition_hash))
+ return false;
return true;
}
bool operator!=(const get_scanner_request &rhs) const { return !(*this == rhs); }
diff --git a/src/server/pegasus_scan_context.h b/src/server/pegasus_scan_context.h
index a9c24fb..da3cd08 100644
--- a/src/server/pegasus_scan_context.h
+++ b/src/server/pegasus_scan_context.h
@@ -41,7 +41,8 @@ struct pegasus_scan_context
::dsn::apps::filter_type::type sort_key_filter_type_,
const std::string &&sort_key_filter_pattern_,
int32_t batch_size_,
- bool no_value_)
+ bool no_value_,
+ bool validate_partition_hash_)
: _stop_holder(std::move(stop_)),
_hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
_sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
@@ -55,7 +56,8 @@ struct pegasus_scan_context
sort_key_filter_pattern(
_sort_key_filter_pattern_holder.data(), 0, _sort_key_filter_pattern_holder.length()),
batch_size(batch_size_),
- no_value(no_value_)
+ no_value(no_value_),
+ validate_partition_hash(validate_partition_hash_)
{
}
@@ -74,6 +76,7 @@ public:
dsn::blob sort_key_filter_pattern;
int32_t batch_size;
bool no_value;
+ bool validate_partition_hash;
};
class pegasus_context_cache
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 7816bcd..4f9fbe5 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -483,23 +483,30 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
limiter->add_count();
// extract value
- int r = append_key_value_for_multi_get(resp.kvs,
- it->key(),
- it->value(),
- request.sort_key_filter_type,
- request.sort_key_filter_pattern,
- epoch_now,
- request.no_value);
- if (r == 1) {
+ auto state = append_key_value_for_multi_get(resp.kvs,
+ it->key(),
+ it->value(),
+ request.sort_key_filter_type,
+ request.sort_key_filter_pattern,
+ epoch_now,
+ request.no_value);
+
+ switch (state) {
+ case range_iteration_state::kNormal: {
count++;
auto &kv = resp.kvs.back();
uint64_t kv_size = kv.key.length() + kv.value.length();
size += kv_size;
limiter->add_size(kv_size);
- } else if (r == 2) {
+ } break;
+ case range_iteration_state::kExpired:
expire_count++;
- } else { // r == 3
+ break;
+ case range_iteration_state::kFiltered:
filter_count++;
+ break;
+ default:
+ break;
}
if (c == 0) {
@@ -548,23 +555,29 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
limiter->add_count();
// extract value
- int r = append_key_value_for_multi_get(reverse_kvs,
- it->key(),
- it->value(),
- request.sort_key_filter_type,
- request.sort_key_filter_pattern,
- epoch_now,
- request.no_value);
- if (r == 1) {
+ auto state = append_key_value_for_multi_get(reverse_kvs,
+ it->key(),
+ it->value(),
+ request.sort_key_filter_type,
+ request.sort_key_filter_pattern,
+ epoch_now,
+ request.no_value);
+ switch (state) {
+ case range_iteration_state::kNormal: {
count++;
auto &kv = reverse_kvs.back();
uint64_t kv_size = kv.key.length() + kv.value.length();
size += kv_size;
limiter->add_size(kv_size);
- } else if (r == 2) {
+ } break;
+ case range_iteration_state::kExpired:
expire_count++;
- } else { // r == 3
+ break;
+ case range_iteration_state::kFiltered:
filter_count++;
+ break;
+ default:
+ break;
}
if (c == 0) {
@@ -1016,21 +1029,29 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
limiter->add_count();
- int r = append_key_value_for_scan(resp.kvs,
- it->key(),
- it->value(),
- request.hash_key_filter_type,
- request.hash_key_filter_pattern,
- request.sort_key_filter_type,
- request.sort_key_filter_pattern,
- epoch_now,
- request.no_value);
- if (r == 1) {
+ auto state = append_key_value_for_scan(
+ resp.kvs,
+ it->key(),
+ it->value(),
+ request.hash_key_filter_type,
+ request.hash_key_filter_pattern,
+ request.sort_key_filter_type,
+ request.sort_key_filter_pattern,
+ epoch_now,
+ request.no_value,
+ request.__isset.validate_partition_hash ? request.validate_partition_hash : true);
+ switch (state) {
+ case range_iteration_state::kNormal:
count++;
- } else if (r == 2) {
+ break;
+ case range_iteration_state::kExpired:
expire_count++;
- } else { // r == 3
+ break;
+ case range_iteration_state::kFiltered:
filter_count++;
+ break;
+ default:
+ break;
}
if (c == 0) {
@@ -1081,18 +1102,19 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
limiter->max_duration_time());
} else if (it->Valid() && !complete) {
// scan not completed
- std::unique_ptr<pegasus_scan_context> context(
- new pegasus_scan_context(std::move(it),
- std::string(stop.data(), stop.size()),
- request.stop_inclusive,
- request.hash_key_filter_type,
- std::string(request.hash_key_filter_pattern.data(),
- request.hash_key_filter_pattern.length()),
- request.sort_key_filter_type,
- std::string(request.sort_key_filter_pattern.data(),
- request.sort_key_filter_pattern.length()),
- batch_count,
- request.no_value));
+ std::unique_ptr<pegasus_scan_context> context(new pegasus_scan_context(
+ std::move(it),
+ std::string(stop.data(), stop.size()),
+ request.stop_inclusive,
+ request.hash_key_filter_type,
+ std::string(request.hash_key_filter_pattern.data(),
+ request.hash_key_filter_pattern.length()),
+ request.sort_key_filter_type,
+ std::string(request.sort_key_filter_pattern.data(),
+ request.sort_key_filter_pattern.length()),
+ batch_count,
+ request.no_value,
+ request.__isset.validate_partition_hash ? request.validate_partition_hash : true));
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
// if the context is used, it will be fetched and re-put into cache,
@@ -1140,6 +1162,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
bool no_value = context->no_value;
+ bool validate_hash = context->validate_partition_hash;
bool complete = false;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;
@@ -1163,21 +1186,28 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
limiter->add_count();
- int r = append_key_value_for_scan(resp.kvs,
- it->key(),
- it->value(),
- hash_key_filter_type,
- hash_key_filter_pattern,
- sort_key_filter_type,
- sort_key_filter_pattern,
- epoch_now,
- no_value);
- if (r == 1) {
+ auto state = append_key_value_for_scan(resp.kvs,
+ it->key(),
+ it->value(),
+ hash_key_filter_type,
+ hash_key_filter_pattern,
+ sort_key_filter_type,
+ sort_key_filter_pattern,
+ epoch_now,
+ no_value,
+ validate_hash);
+ switch (state) {
+ case range_iteration_state::kNormal:
count++;
- } else if (r == 2) {
+ break;
+ case range_iteration_state::kExpired:
expire_count++;
- } else { // r == 3
+ break;
+ case range_iteration_state::kFiltered:
filter_count++;
+ break;
+ default:
+ break;
}
if (c == 0) {
@@ -2046,22 +2076,33 @@ bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_
return false;
}
-int pegasus_server_impl::append_key_value_for_scan(
- std::vector<::dsn::apps::key_value> &kvs,
- const rocksdb::Slice &key,
- const rocksdb::Slice &value,
- ::dsn::apps::filter_type::type hash_key_filter_type,
- const ::dsn::blob &hash_key_filter_pattern,
- ::dsn::apps::filter_type::type sort_key_filter_type,
- const ::dsn::blob &sort_key_filter_pattern,
- uint32_t epoch_now,
- bool no_value)
+range_iteration_state
+pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
+ const rocksdb::Slice &key,
+ const rocksdb::Slice &value,
+ ::dsn::apps::filter_type::type hash_key_filter_type,
+ const ::dsn::blob &hash_key_filter_pattern,
+ ::dsn::apps::filter_type::type sort_key_filter_type,
+ const ::dsn::blob &sort_key_filter_pattern,
+ uint32_t epoch_now,
+ bool no_value,
+ bool request_validate_hash)
{
if (check_if_record_expired(epoch_now, value)) {
if (_verbose_log) {
derror("%s: rocksdb data expired for scan", replica_name());
}
- return 2;
+ return range_iteration_state::kExpired;
+ }
+
+ if (request_validate_hash && _validate_partition_hash) {
+ if (_partition_version < 0 || _gpid.get_partition_index() > _partition_version ||
+ !check_pegasus_key_hash(key, _gpid.get_partition_index(), _partition_version)) {
+ if (_verbose_log) {
+ derror_replica("not serve hash key while scan");
+ }
+ return range_iteration_state::kHashInvalid;
+ }
}
::dsn::apps::key_value kv;
@@ -2077,14 +2118,14 @@ int pegasus_server_impl::append_key_value_for_scan(
if (_verbose_log) {
derror("%s: hash key filtered for scan", replica_name());
}
- return 3;
+ return range_iteration_state::kFiltered;
}
if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
!validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
if (_verbose_log) {
derror("%s: sort key filtered for scan", replica_name());
}
- return 3;
+ return range_iteration_state::kFiltered;
}
}
std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
@@ -2098,10 +2139,10 @@ int pegasus_server_impl::append_key_value_for_scan(
}
kvs.emplace_back(std::move(kv));
- return 1;
+ return range_iteration_state::kNormal;
}
-int pegasus_server_impl::append_key_value_for_multi_get(
+range_iteration_state pegasus_server_impl::append_key_value_for_multi_get(
std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
@@ -2114,7 +2155,7 @@ int pegasus_server_impl::append_key_value_for_multi_get(
if (_verbose_log) {
derror("%s: rocksdb data expired for multi get", replica_name());
}
- return 2;
+ return range_iteration_state::kExpired;
}
::dsn::apps::key_value kv;
@@ -2128,7 +2169,7 @@ int pegasus_server_impl::append_key_value_for_multi_get(
if (_verbose_log) {
derror("%s: sort key filtered for multi get", replica_name());
}
- return 3;
+ return range_iteration_state::kFiltered;
}
std::shared_ptr<char> sort_key_buf(::dsn::utils::make_shared_array<char>(sort_key.length()));
::memcpy(sort_key_buf.get(), sort_key.data(), sort_key.length());
@@ -2141,7 +2182,7 @@ int pegasus_server_impl::append_key_value_for_multi_get(
}
kvs.emplace_back(std::move(kv));
- return 1;
+ return range_iteration_state::kNormal;
}
void pegasus_server_impl::update_replica_rocksdb_statistics()
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 2b314c3..4a26577 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -45,6 +45,14 @@ class capacity_unit_calculator;
class pegasus_server_write;
class hotkey_collector;
+enum class range_iteration_state
+{
+ kNormal = 1,
+ kExpired,
+ kFiltered,
+ kHashInvalid
+};
+
class pegasus_server_impl : public pegasus_read_service
{
public:
@@ -203,29 +211,26 @@ private:
void set_last_durable_decree(int64_t decree) { _last_durable_decree.store(decree); }
- // return 1 if value is appended
- // return 2 if value is expired
- // return 3 if value is filtered
- int append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
- const rocksdb::Slice &key,
- const rocksdb::Slice &value,
- ::dsn::apps::filter_type::type hash_key_filter_type,
- const ::dsn::blob &hash_key_filter_pattern,
- ::dsn::apps::filter_type::type sort_key_filter_type,
- const ::dsn::blob &sort_key_filter_pattern,
- uint32_t epoch_now,
- bool no_value);
-
- // return 1 if value is appended
- // return 2 if value is expired
- // return 3 if value is filtered
- int append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
- const rocksdb::Slice &key,
- const rocksdb::Slice &value,
- ::dsn::apps::filter_type::type sort_key_filter_type,
- const ::dsn::blob &sort_key_filter_pattern,
- uint32_t epoch_now,
- bool no_value);
+ range_iteration_state
+ append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
+ const rocksdb::Slice &key,
+ const rocksdb::Slice &value,
+ ::dsn::apps::filter_type::type hash_key_filter_type,
+ const ::dsn::blob &hash_key_filter_pattern,
+ ::dsn::apps::filter_type::type sort_key_filter_type,
+ const ::dsn::blob &sort_key_filter_pattern,
+ uint32_t epoch_now,
+ bool no_value,
+ bool request_validate_hash);
+
+ range_iteration_state
+ append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
+ const rocksdb::Slice &key,
+ const rocksdb::Slice &value,
+ ::dsn::apps::filter_type::type sort_key_filter_type,
+ const ::dsn::blob &sort_key_filter_pattern,
+ uint32_t epoch_now,
+ bool no_value);
// return true if the filter type is supported
bool is_filter_type_supported(::dsn::apps::filter_type::type filter_type)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org