You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2021/10/27 08:41:38 UTC

[incubator-pegasus] 02/03: fix: `full_scan` can't scan data completely in some occassions (#825)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch v2.3
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 90fea54d1dd95f90c7a7ba510ac290dee538bf7b
Author: cauchy1988 <ta...@163.com>
AuthorDate: Wed Oct 13 18:04:27 2021 +0800

    fix: `full_scan` can't scan data completely in some occassions (#825)
---
 src/base/rrdb_types.cpp                 | 32 ++++++++++++++++++++++++++++++--
 src/client_lib/pegasus_client_impl.cpp  |  5 +++--
 src/client_lib/pegasus_client_impl.h    |  7 +++++--
 src/client_lib/pegasus_scanner_impl.cpp | 13 +++++++++----
 src/idl/rrdb.thrift                     |  1 +
 src/include/rrdb/rrdb_types.h           | 19 +++++++++++++++----
 src/server/pegasus_server_impl.cpp      |  2 +-
 7 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 5e0885e..9eaa7e7 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -3712,6 +3712,12 @@ void get_scanner_request::__set_return_expire_ts(const bool val)
     __isset.return_expire_ts = true;
 }
 
+void get_scanner_request::__set_full_scan(const bool val)
+{
+    this->full_scan = val;
+    __isset.full_scan = true;
+}
+
 uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
 {
 
@@ -3831,6 +3837,14 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
                 xfer += iprot->skip(ftype);
             }
             break;
+        case 13:
+            if (ftype == ::apache::thrift::protocol::T_BOOL) {
+                xfer += iprot->readBool(this->full_scan);
+                this->__isset.full_scan = true;
+            } else {
+                xfer += iprot->skip(ftype);
+            }
+            break;
         default:
             xfer += iprot->skip(ftype);
             break;
@@ -3902,6 +3916,11 @@ uint32_t get_scanner_request::write(::apache::thrift::protocol::TProtocol *oprot
         xfer += oprot->writeBool(this->return_expire_ts);
         xfer += oprot->writeFieldEnd();
     }
+    if (this->__isset.full_scan) {
+        xfer += oprot->writeFieldBegin("full_scan", ::apache::thrift::protocol::T_BOOL, 13);
+        xfer += oprot->writeBool(this->full_scan);
+        xfer += oprot->writeFieldEnd();
+    }
     xfer += oprot->writeFieldStop();
     xfer += oprot->writeStructEnd();
     return xfer;
@@ -3922,6 +3941,7 @@ void swap(get_scanner_request &a, get_scanner_request &b)
     swap(a.sort_key_filter_pattern, b.sort_key_filter_pattern);
     swap(a.validate_partition_hash, b.validate_partition_hash);
     swap(a.return_expire_ts, b.return_expire_ts);
+    swap(a.full_scan, b.full_scan);
     swap(a.__isset, b.__isset);
 }
 
@@ -3939,6 +3959,7 @@ get_scanner_request::get_scanner_request(const get_scanner_request &other108)
     sort_key_filter_pattern = other108.sort_key_filter_pattern;
     validate_partition_hash = other108.validate_partition_hash;
     return_expire_ts = other108.return_expire_ts;
+    full_scan = other108.full_scan;
     __isset = other108.__isset;
 }
 get_scanner_request::get_scanner_request(get_scanner_request &&other109)
@@ -3955,6 +3976,7 @@ get_scanner_request::get_scanner_request(get_scanner_request &&other109)
     sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern);
     validate_partition_hash = std::move(other109.validate_partition_hash);
     return_expire_ts = std::move(other109.return_expire_ts);
+    full_scan = std::move(other109.full_scan);
     __isset = std::move(other109.__isset);
 }
 get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110)
@@ -3971,6 +3993,7 @@ get_scanner_request &get_scanner_request::operator=(const get_scanner_request &o
     sort_key_filter_pattern = other110.sort_key_filter_pattern;
     validate_partition_hash = other110.validate_partition_hash;
     return_expire_ts = other110.return_expire_ts;
+    full_scan = other110.full_scan;
     __isset = other110.__isset;
     return *this;
 }
@@ -3988,6 +4011,7 @@ get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other1
     sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern);
     validate_partition_hash = std::move(other111.validate_partition_hash);
     return_expire_ts = std::move(other111.return_expire_ts);
+    full_scan = std::move(other111.full_scan);
     __isset = std::move(other111.__isset);
     return *this;
 }
@@ -4021,6 +4045,9 @@ void get_scanner_request::printTo(std::ostream &out) const
     out << ", "
         << "return_expire_ts=";
     (__isset.return_expire_ts ? (out << to_string(return_expire_ts)) : (out << "<null>"));
+    out << ", "
+        << "full_scan=";
+    (__isset.full_scan ? (out << to_string(full_scan)) : (out << "<null>"));
     out << ")";
 }
 
@@ -4674,5 +4701,6 @@ void duplicate_response::printTo(std::ostream &out) const
     (__isset.error_hint ? (out << to_string(error_hint)) : (out << "<null>"));
     out << ")";
 }
-}
-} // namespace
+
+} // namespace apps
+} // namespace dsn
diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp
index 826bf4e..9e58942 100644
--- a/src/client_lib/pegasus_client_impl.cpp
+++ b/src/client_lib/pegasus_client_impl.cpp
@@ -1179,7 +1179,7 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key,
     if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) {
         v.push_back(pegasus_key_hash(start));
     }
-    scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false);
+    scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false, false);
 
     return PERR_OK;
 }
@@ -1223,7 +1223,8 @@ void pegasus_client_impl::async_get_unordered_scanners(
                     std::vector<uint64_t> hash(s);
                     for (int j = 0; j < s; j++)
                         hash[j] = --count;
-                    scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options, true);
+                    scanners[i] =
+                        new pegasus_scanner_impl(_client, std::move(hash), options, true, true);
                 }
             }
         }
diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h
index be33e88..adbbc71 100644
--- a/src/client_lib/pegasus_client_impl.h
+++ b/src/client_lib/pegasus_client_impl.h
@@ -266,13 +266,15 @@ public:
         pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
                              std::vector<uint64_t> &&hash,
                              const scan_options &options,
-                             bool validate_partition_hash);
+                             bool validate_partition_hash,
+                             bool full_scan);
         pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
                              std::vector<uint64_t> &&hash,
                              const scan_options &options,
                              const ::dsn::blob &start_key,
                              const ::dsn::blob &stop_key,
-                             bool validate_partition_hash);
+                             bool validate_partition_hash,
+                             bool full_scan);
 
     private:
         ::dsn::apps::rrdb_client *_client;
@@ -291,6 +293,7 @@ public:
         std::list<async_scan_next_callback_t> _queue;
         volatile bool _rpc_started;
         bool _validate_partition_hash;
+        bool _full_scan;
 
         void _async_next_internal();
         void _start_scan();
diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp
index 487bdd5..ce9baa5 100644
--- a/src/client_lib/pegasus_scanner_impl.cpp
+++ b/src/client_lib/pegasus_scanner_impl.cpp
@@ -29,8 +29,10 @@ namespace client {
 pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
                                                                 std::vector<uint64_t> &&hash,
                                                                 const scan_options &options,
-                                                                bool validate_partition_hash)
-    : pegasus_scanner_impl(client, std::move(hash), options, _min, _max, validate_partition_hash)
+                                                                bool validate_partition_hash,
+                                                                bool full_scan)
+    : pegasus_scanner_impl(
+          client, std::move(hash), options, _min, _max, validate_partition_hash, full_scan)
 {
     _options.start_inclusive = true;
     _options.stop_inclusive = false;
@@ -41,7 +43,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
                                                                 const scan_options &options,
                                                                 const ::dsn::blob &start_key,
                                                                 const ::dsn::blob &stop_key,
-                                                                bool validate_partition_hash)
+                                                                bool validate_partition_hash,
+                                                                bool full_scan)
     : _client(client),
       _start_key(start_key),
       _stop_key(stop_key),
@@ -50,7 +53,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
       _p(-1),
       _context(SCAN_CONTEXT_ID_COMPLETED),
       _rpc_started(false),
-      _validate_partition_hash(validate_partition_hash)
+      _validate_partition_hash(validate_partition_hash),
+      _full_scan(full_scan)
 {
 }
 
@@ -225,6 +229,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
     req.no_value = _options.no_value;
     req.__set_validate_partition_hash(_validate_partition_hash);
     req.__set_return_expire_ts(_options.return_expire_ts);
+    req.__set_full_scan(_full_scan);
 
     dassert(!_rpc_started, "");
     _rpc_started = true;
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 125efbb..b81483b 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -255,6 +255,7 @@ struct get_scanner_request
     10:dsn.blob    sort_key_filter_pattern;
     11:optional bool    validate_partition_hash;
     12:optional bool    return_expire_ts;
+    13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
 }
 
 struct scan_request
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index 15f1730..205216a 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -1568,7 +1568,8 @@ typedef struct _get_scanner_request__isset
           sort_key_filter_type(false),
           sort_key_filter_pattern(false),
           validate_partition_hash(false),
-          return_expire_ts(false)
+          return_expire_ts(false),
+          full_scan(false)
     {
     }
     bool start_key : 1;
@@ -1583,6 +1584,7 @@ typedef struct _get_scanner_request__isset
     bool sort_key_filter_pattern : 1;
     bool validate_partition_hash : 1;
     bool return_expire_ts : 1;
+    bool full_scan : 1;
 } _get_scanner_request__isset;
 
 class get_scanner_request
@@ -1600,7 +1602,8 @@ public:
           hash_key_filter_type((filter_type::type)0),
           sort_key_filter_type((filter_type::type)0),
           validate_partition_hash(0),
-          return_expire_ts(0)
+          return_expire_ts(0),
+          full_scan(0)
     {
     }
 
@@ -1617,6 +1620,7 @@ public:
     ::dsn::blob sort_key_filter_pattern;
     bool validate_partition_hash;
     bool return_expire_ts;
+    bool full_scan;
 
     _get_scanner_request__isset __isset;
 
@@ -1644,6 +1648,8 @@ public:
 
     void __set_return_expire_ts(const bool val);
 
+    void __set_full_scan(const bool val);
+
     bool operator==(const get_scanner_request &rhs) const
     {
         if (!(start_key == rhs.start_key))
@@ -1675,6 +1681,10 @@ public:
             return false;
         else if (__isset.return_expire_ts && !(return_expire_ts == rhs.return_expire_ts))
             return false;
+        if (__isset.full_scan != rhs.__isset.full_scan)
+            return false;
+        else if (__isset.full_scan && !(full_scan == rhs.full_scan))
+            return false;
         return true;
     }
     bool operator!=(const get_scanner_request &rhs) const { return !(*this == rhs); }
@@ -1967,7 +1977,8 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj
     obj.printTo(out);
     return out;
 }
-}
-} // namespace
+
+} // namespace apps
+} // namespace dsn
 
 #endif
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 9375bbe..34ef06e 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -945,7 +945,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
     if (_data_cf_opts.prefix_extractor) {
         ::dsn::blob start_hash_key, tmp;
         pegasus_restore_key(request.start_key, start_hash_key, tmp);
-        if (start_hash_key.size() == 0) {
+        if (start_hash_key.size() == 0 || request.full_scan) {
             // hash_key is not passed, only happened when do full scan (scanners got by
             // get_unordered_scanners) on a partition, we have to do total order seek on rocksDB.
             rd_opts.total_order_seek = true;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org