You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2021/10/27 08:41:38 UTC
[incubator-pegasus] 02/03: fix: `full_scan` can't scan data
completely in some occassions (#825)
This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch v2.3
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 90fea54d1dd95f90c7a7ba510ac290dee538bf7b
Author: cauchy1988 <ta...@163.com>
AuthorDate: Wed Oct 13 18:04:27 2021 +0800
fix: `full_scan` can't scan data completely in some occassions (#825)
---
src/base/rrdb_types.cpp | 32 ++++++++++++++++++++++++++++++--
src/client_lib/pegasus_client_impl.cpp | 5 +++--
src/client_lib/pegasus_client_impl.h | 7 +++++--
src/client_lib/pegasus_scanner_impl.cpp | 13 +++++++++----
src/idl/rrdb.thrift | 1 +
src/include/rrdb/rrdb_types.h | 19 +++++++++++++++----
src/server/pegasus_server_impl.cpp | 2 +-
7 files changed, 64 insertions(+), 15 deletions(-)
diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 5e0885e..9eaa7e7 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -3712,6 +3712,12 @@ void get_scanner_request::__set_return_expire_ts(const bool val)
__isset.return_expire_ts = true;
}
+void get_scanner_request::__set_full_scan(const bool val)
+{
+ this->full_scan = val;
+ __isset.full_scan = true;
+}
+
uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
{
@@ -3831,6 +3837,14 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
xfer += iprot->skip(ftype);
}
break;
+ case 13:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->full_scan);
+ this->__isset.full_scan = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -3902,6 +3916,11 @@ uint32_t get_scanner_request::write(::apache::thrift::protocol::TProtocol *oprot
xfer += oprot->writeBool(this->return_expire_ts);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.full_scan) {
+ xfer += oprot->writeFieldBegin("full_scan", ::apache::thrift::protocol::T_BOOL, 13);
+ xfer += oprot->writeBool(this->full_scan);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -3922,6 +3941,7 @@ void swap(get_scanner_request &a, get_scanner_request &b)
swap(a.sort_key_filter_pattern, b.sort_key_filter_pattern);
swap(a.validate_partition_hash, b.validate_partition_hash);
swap(a.return_expire_ts, b.return_expire_ts);
+ swap(a.full_scan, b.full_scan);
swap(a.__isset, b.__isset);
}
@@ -3939,6 +3959,7 @@ get_scanner_request::get_scanner_request(const get_scanner_request &other108)
sort_key_filter_pattern = other108.sort_key_filter_pattern;
validate_partition_hash = other108.validate_partition_hash;
return_expire_ts = other108.return_expire_ts;
+ full_scan = other108.full_scan;
__isset = other108.__isset;
}
get_scanner_request::get_scanner_request(get_scanner_request &&other109)
@@ -3955,6 +3976,7 @@ get_scanner_request::get_scanner_request(get_scanner_request &&other109)
sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern);
validate_partition_hash = std::move(other109.validate_partition_hash);
return_expire_ts = std::move(other109.return_expire_ts);
+ full_scan = std::move(other109.full_scan);
__isset = std::move(other109.__isset);
}
get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110)
@@ -3971,6 +3993,7 @@ get_scanner_request &get_scanner_request::operator=(const get_scanner_request &o
sort_key_filter_pattern = other110.sort_key_filter_pattern;
validate_partition_hash = other110.validate_partition_hash;
return_expire_ts = other110.return_expire_ts;
+ full_scan = other110.full_scan;
__isset = other110.__isset;
return *this;
}
@@ -3988,6 +4011,7 @@ get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other1
sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern);
validate_partition_hash = std::move(other111.validate_partition_hash);
return_expire_ts = std::move(other111.return_expire_ts);
+ full_scan = std::move(other111.full_scan);
__isset = std::move(other111.__isset);
return *this;
}
@@ -4021,6 +4045,9 @@ void get_scanner_request::printTo(std::ostream &out) const
out << ", "
<< "return_expire_ts=";
(__isset.return_expire_ts ? (out << to_string(return_expire_ts)) : (out << "<null>"));
+ out << ", "
+ << "full_scan=";
+ (__isset.full_scan ? (out << to_string(full_scan)) : (out << "<null>"));
out << ")";
}
@@ -4674,5 +4701,6 @@ void duplicate_response::printTo(std::ostream &out) const
(__isset.error_hint ? (out << to_string(error_hint)) : (out << "<null>"));
out << ")";
}
-}
-} // namespace
+
+} // namespace apps
+} // namespace dsn
diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp
index 826bf4e..9e58942 100644
--- a/src/client_lib/pegasus_client_impl.cpp
+++ b/src/client_lib/pegasus_client_impl.cpp
@@ -1179,7 +1179,7 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key,
if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) {
v.push_back(pegasus_key_hash(start));
}
- scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false);
+ scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false, false);
return PERR_OK;
}
@@ -1223,7 +1223,8 @@ void pegasus_client_impl::async_get_unordered_scanners(
std::vector<uint64_t> hash(s);
for (int j = 0; j < s; j++)
hash[j] = --count;
- scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options, true);
+ scanners[i] =
+ new pegasus_scanner_impl(_client, std::move(hash), options, true, true);
}
}
}
diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h
index be33e88..adbbc71 100644
--- a/src/client_lib/pegasus_client_impl.h
+++ b/src/client_lib/pegasus_client_impl.h
@@ -266,13 +266,15 @@ public:
pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
const scan_options &options,
- bool validate_partition_hash);
+ bool validate_partition_hash,
+ bool full_scan);
pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
const scan_options &options,
const ::dsn::blob &start_key,
const ::dsn::blob &stop_key,
- bool validate_partition_hash);
+ bool validate_partition_hash,
+ bool full_scan);
private:
::dsn::apps::rrdb_client *_client;
@@ -291,6 +293,7 @@ public:
std::list<async_scan_next_callback_t> _queue;
volatile bool _rpc_started;
bool _validate_partition_hash;
+ bool _full_scan;
void _async_next_internal();
void _start_scan();
diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp
index 487bdd5..ce9baa5 100644
--- a/src/client_lib/pegasus_scanner_impl.cpp
+++ b/src/client_lib/pegasus_scanner_impl.cpp
@@ -29,8 +29,10 @@ namespace client {
pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
const scan_options &options,
- bool validate_partition_hash)
- : pegasus_scanner_impl(client, std::move(hash), options, _min, _max, validate_partition_hash)
+ bool validate_partition_hash,
+ bool full_scan)
+ : pegasus_scanner_impl(
+ client, std::move(hash), options, _min, _max, validate_partition_hash, full_scan)
{
_options.start_inclusive = true;
_options.stop_inclusive = false;
@@ -41,7 +43,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
const scan_options &options,
const ::dsn::blob &start_key,
const ::dsn::blob &stop_key,
- bool validate_partition_hash)
+ bool validate_partition_hash,
+ bool full_scan)
: _client(client),
_start_key(start_key),
_stop_key(stop_key),
@@ -50,7 +53,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_p(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
_rpc_started(false),
- _validate_partition_hash(validate_partition_hash)
+ _validate_partition_hash(validate_partition_hash),
+ _full_scan(full_scan)
{
}
@@ -225,6 +229,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.no_value = _options.no_value;
req.__set_validate_partition_hash(_validate_partition_hash);
req.__set_return_expire_ts(_options.return_expire_ts);
+ req.__set_full_scan(_full_scan);
dassert(!_rpc_started, "");
_rpc_started = true;
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 125efbb..b81483b 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -255,6 +255,7 @@ struct get_scanner_request
10:dsn.blob sort_key_filter_pattern;
11:optional bool validate_partition_hash;
12:optional bool return_expire_ts;
+ 13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
}
struct scan_request
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index 15f1730..205216a 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -1568,7 +1568,8 @@ typedef struct _get_scanner_request__isset
sort_key_filter_type(false),
sort_key_filter_pattern(false),
validate_partition_hash(false),
- return_expire_ts(false)
+ return_expire_ts(false),
+ full_scan(false)
{
}
bool start_key : 1;
@@ -1583,6 +1584,7 @@ typedef struct _get_scanner_request__isset
bool sort_key_filter_pattern : 1;
bool validate_partition_hash : 1;
bool return_expire_ts : 1;
+ bool full_scan : 1;
} _get_scanner_request__isset;
class get_scanner_request
@@ -1600,7 +1602,8 @@ public:
hash_key_filter_type((filter_type::type)0),
sort_key_filter_type((filter_type::type)0),
validate_partition_hash(0),
- return_expire_ts(0)
+ return_expire_ts(0),
+ full_scan(0)
{
}
@@ -1617,6 +1620,7 @@ public:
::dsn::blob sort_key_filter_pattern;
bool validate_partition_hash;
bool return_expire_ts;
+ bool full_scan;
_get_scanner_request__isset __isset;
@@ -1644,6 +1648,8 @@ public:
void __set_return_expire_ts(const bool val);
+ void __set_full_scan(const bool val);
+
bool operator==(const get_scanner_request &rhs) const
{
if (!(start_key == rhs.start_key))
@@ -1675,6 +1681,10 @@ public:
return false;
else if (__isset.return_expire_ts && !(return_expire_ts == rhs.return_expire_ts))
return false;
+ if (__isset.full_scan != rhs.__isset.full_scan)
+ return false;
+ else if (__isset.full_scan && !(full_scan == rhs.full_scan))
+ return false;
return true;
}
bool operator!=(const get_scanner_request &rhs) const { return !(*this == rhs); }
@@ -1967,7 +1977,8 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj
obj.printTo(out);
return out;
}
-}
-} // namespace
+
+} // namespace apps
+} // namespace dsn
#endif
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 9375bbe..34ef06e 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -945,7 +945,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
if (_data_cf_opts.prefix_extractor) {
::dsn::blob start_hash_key, tmp;
pegasus_restore_key(request.start_key, start_hash_key, tmp);
- if (start_hash_key.size() == 0) {
+ if (start_hash_key.size() == 0 || request.full_scan) {
// hash_key is not passed, only happened when do full scan (scanners got by
// get_unordered_scanners) on a partition, we have to do total order seek on rocksDB.
rd_opts.total_order_seek = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org