You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/07/06 08:50:53 UTC

[incubator-pegasus] branch master updated: feat: support preserving TTL for copy_data (#752)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d5128fa  feat: support preserving TTL for copy_data (#752)
d5128fa is described below

commit d5128fa5e0813aa73a68f6df02100efdb025721d
Author: Dan Wang <em...@126.com>
AuthorDate: Tue Jul 6 16:50:43 2021 +0800

    feat: support preserving TTL for copy_data (#752)
---
 src/base/rrdb_types.cpp                 |  54 +++++++++++++
 src/client_lib/pegasus_scanner_impl.cpp |  28 +++++--
 src/geo/lib/geo_client.cpp              |   3 +-
 src/idl/rrdb.thrift                     |   2 +
 src/include/pegasus/client.h            |  10 ++-
 src/include/rrdb/rrdb_types.h           |  26 +++++-
 src/server/pegasus_scan_context.h       |   7 +-
 src/server/pegasus_server_impl.cpp      |  22 ++++-
 src/server/pegasus_server_impl.h        |   3 +-
 src/shell/command_helper.h              |  73 +++++++++++------
 src/shell/commands/data_operations.cpp  |   7 +-
 src/shell/main.cpp                      |   3 +-
 src/test/function_test/test_scan.cpp    | 138 +++++++++++++++++++++++++++++++-
 13 files changed, 324 insertions(+), 52 deletions(-)

diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 42c6778..5e0885e 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -929,6 +929,12 @@ void key_value::__set_key(const ::dsn::blob &val) { this->key = val; }
 
 void key_value::__set_value(const ::dsn::blob &val) { this->value = val; }
 
+void key_value::__set_expire_ts_seconds(const int32_t val)
+{
+    this->expire_ts_seconds = val;
+    __isset.expire_ts_seconds = true;
+}
+
 uint32_t key_value::read(::apache::thrift::protocol::TProtocol *iprot)
 {
 
@@ -964,6 +970,14 @@ uint32_t key_value::read(::apache::thrift::protocol::TProtocol *iprot)
                 xfer += iprot->skip(ftype);
             }
             break;
+        case 3:
+            if (ftype == ::apache::thrift::protocol::T_I32) {
+                xfer += iprot->readI32(this->expire_ts_seconds);
+                this->__isset.expire_ts_seconds = true;
+            } else {
+                xfer += iprot->skip(ftype);
+            }
+            break;
         default:
             xfer += iprot->skip(ftype);
             break;
@@ -990,6 +1004,11 @@ uint32_t key_value::write(::apache::thrift::protocol::TProtocol *oprot) const
     xfer += this->value.write(oprot);
     xfer += oprot->writeFieldEnd();
 
+    if (this->__isset.expire_ts_seconds) {
+        xfer += oprot->writeFieldBegin("expire_ts_seconds", ::apache::thrift::protocol::T_I32, 3);
+        xfer += oprot->writeI32(this->expire_ts_seconds);
+        xfer += oprot->writeFieldEnd();
+    }
     xfer += oprot->writeFieldStop();
     xfer += oprot->writeStructEnd();
     return xfer;
@@ -1000,6 +1019,7 @@ void swap(key_value &a, key_value &b)
     using ::std::swap;
     swap(a.key, b.key);
     swap(a.value, b.value);
+    swap(a.expire_ts_seconds, b.expire_ts_seconds);
     swap(a.__isset, b.__isset);
 }
 
@@ -1007,18 +1027,21 @@ key_value::key_value(const key_value &other20)
 {
     key = other20.key;
     value = other20.value;
+    expire_ts_seconds = other20.expire_ts_seconds;
     __isset = other20.__isset;
 }
 key_value::key_value(key_value &&other21)
 {
     key = std::move(other21.key);
     value = std::move(other21.value);
+    expire_ts_seconds = std::move(other21.expire_ts_seconds);
     __isset = std::move(other21.__isset);
 }
 key_value &key_value::operator=(const key_value &other22)
 {
     key = other22.key;
     value = other22.value;
+    expire_ts_seconds = other22.expire_ts_seconds;
     __isset = other22.__isset;
     return *this;
 }
@@ -1026,6 +1049,7 @@ key_value &key_value::operator=(key_value &&other23)
 {
     key = std::move(other23.key);
     value = std::move(other23.value);
+    expire_ts_seconds = std::move(other23.expire_ts_seconds);
     __isset = std::move(other23.__isset);
     return *this;
 }
@@ -1036,6 +1060,9 @@ void key_value::printTo(std::ostream &out) const
     out << "key=" << to_string(key);
     out << ", "
         << "value=" << to_string(value);
+    out << ", "
+        << "expire_ts_seconds=";
+    (__isset.expire_ts_seconds ? (out << to_string(expire_ts_seconds)) : (out << "<null>"));
     out << ")";
 }
 
@@ -3679,6 +3706,12 @@ void get_scanner_request::__set_validate_partition_hash(const bool val)
     __isset.validate_partition_hash = true;
 }
 
+void get_scanner_request::__set_return_expire_ts(const bool val)
+{
+    this->return_expire_ts = val;
+    __isset.return_expire_ts = true;
+}
+
 uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
 {
 
@@ -3790,6 +3823,14 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
                 xfer += iprot->skip(ftype);
             }
             break;
+        case 12:
+            if (ftype == ::apache::thrift::protocol::T_BOOL) {
+                xfer += iprot->readBool(this->return_expire_ts);
+                this->__isset.return_expire_ts = true;
+            } else {
+                xfer += iprot->skip(ftype);
+            }
+            break;
         default:
             xfer += iprot->skip(ftype);
             break;
@@ -3856,6 +3897,11 @@ uint32_t get_scanner_request::write(::apache::thrift::protocol::TProtocol *oprot
         xfer += oprot->writeBool(this->validate_partition_hash);
         xfer += oprot->writeFieldEnd();
     }
+    if (this->__isset.return_expire_ts) {
+        xfer += oprot->writeFieldBegin("return_expire_ts", ::apache::thrift::protocol::T_BOOL, 12);
+        xfer += oprot->writeBool(this->return_expire_ts);
+        xfer += oprot->writeFieldEnd();
+    }
     xfer += oprot->writeFieldStop();
     xfer += oprot->writeStructEnd();
     return xfer;
@@ -3875,6 +3921,7 @@ void swap(get_scanner_request &a, get_scanner_request &b)
     swap(a.sort_key_filter_type, b.sort_key_filter_type);
     swap(a.sort_key_filter_pattern, b.sort_key_filter_pattern);
     swap(a.validate_partition_hash, b.validate_partition_hash);
+    swap(a.return_expire_ts, b.return_expire_ts);
     swap(a.__isset, b.__isset);
 }
 
@@ -3891,6 +3938,7 @@ get_scanner_request::get_scanner_request(const get_scanner_request &other108)
     sort_key_filter_type = other108.sort_key_filter_type;
     sort_key_filter_pattern = other108.sort_key_filter_pattern;
     validate_partition_hash = other108.validate_partition_hash;
+    return_expire_ts = other108.return_expire_ts;
     __isset = other108.__isset;
 }
 get_scanner_request::get_scanner_request(get_scanner_request &&other109)
@@ -3906,6 +3954,7 @@ get_scanner_request::get_scanner_request(get_scanner_request &&other109)
     sort_key_filter_type = std::move(other109.sort_key_filter_type);
     sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern);
     validate_partition_hash = std::move(other109.validate_partition_hash);
+    return_expire_ts = std::move(other109.return_expire_ts);
     __isset = std::move(other109.__isset);
 }
 get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110)
@@ -3921,6 +3970,7 @@ get_scanner_request &get_scanner_request::operator=(const get_scanner_request &o
     sort_key_filter_type = other110.sort_key_filter_type;
     sort_key_filter_pattern = other110.sort_key_filter_pattern;
     validate_partition_hash = other110.validate_partition_hash;
+    return_expire_ts = other110.return_expire_ts;
     __isset = other110.__isset;
     return *this;
 }
@@ -3937,6 +3987,7 @@ get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other1
     sort_key_filter_type = std::move(other111.sort_key_filter_type);
     sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern);
     validate_partition_hash = std::move(other111.validate_partition_hash);
+    return_expire_ts = std::move(other111.return_expire_ts);
     __isset = std::move(other111.__isset);
     return *this;
 }
@@ -3967,6 +4018,9 @@ void get_scanner_request::printTo(std::ostream &out) const
         << "validate_partition_hash=";
     (__isset.validate_partition_hash ? (out << to_string(validate_partition_hash))
                                      : (out << "<null>"));
+    out << ", "
+        << "return_expire_ts=";
+    (__isset.return_expire_ts ? (out << to_string(return_expire_ts)) : (out << "<null>"));
     out << ")";
 }
 
diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp
index d264c3a..487bdd5 100644
--- a/src/client_lib/pegasus_scanner_impl.cpp
+++ b/src/client_lib/pegasus_scanner_impl.cpp
@@ -61,12 +61,16 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
 {
     ::dsn::utils::notify_event op_completed;
     int ret = -1;
-    auto callback = [&](
-        int err, std::string &&hash, std::string &&sort, std::string &&str, internal_info &&ii) {
+    auto callback = [&](int err,
+                        std::string &&hash,
+                        std::string &&sort,
+                        std::string &&val,
+                        internal_info &&ii,
+                        uint32_t expire_ts_seconds) {
         ret = err;
         hashkey = std::move(hash);
         sortkey = std::move(sort);
-        value = std::move(str);
+        value = std::move(val);
         if (info) {
             (*info) = std::move(ii);
         }
@@ -130,7 +134,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
                                      std::string(),
                                      std::string(),
                                      std::string(),
-                                     std::move(info));
+                                     std::move(info),
+                                     0);
                         }
                     }
                     return;
@@ -156,6 +161,9 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
         std::string hash_key, sort_key;
         pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
         std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
+        uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds
+                                         ? static_cast<uint32_t>(_kvs[_p].expire_ts_seconds)
+                                         : 0;
 
         auto &callback = _queue.front();
         if (callback) {
@@ -165,7 +173,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
                      std::move(hash_key),
                      std::move(sort_key),
                      std::move(value),
-                     std::move(info));
+                     std::move(info),
+                     expire_ts_seconds);
             _lock.lock();
             if (_queue.size() == 1) {
                 // keep the last callback until exit this function
@@ -215,6 +224,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
         _options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size());
     req.no_value = _options.no_value;
     req.__set_validate_partition_hash(_validate_partition_hash);
+    req.__set_return_expire_ts(_options.return_expire_ts);
 
     dassert(!_rpc_started, "");
     _rpc_started = true;
@@ -273,7 +283,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
 
     for (auto &callback : temp) {
         if (callback) {
-            callback(ret, std::string(), std::string(), std::string(), internal_info(info));
+            callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0);
         }
     }
 }
@@ -307,12 +317,14 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
                                                                      std::string &&hash_key,
                                                                      std::string &&sort_key,
                                                                      std::string &&value,
-                                                                     internal_info &&info) {
+                                                                     internal_info &&info,
+                                                                     uint32_t expire_ts_seconds) {
         user_callback(error_code,
                       std::move(hash_key),
                       std::move(sort_key),
                       std::move(value),
-                      std::move(info));
+                      std::move(info),
+                      expire_ts_seconds);
     });
 }
 
diff --git a/src/geo/lib/geo_client.cpp b/src/geo/lib/geo_client.cpp
index cac7b78..e8b60be 100644
--- a/src/geo/lib/geo_client.cpp
+++ b/src/geo/lib/geo_client.cpp
@@ -861,7 +861,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
             std::string &&geo_hash_key,
             std::string &&geo_sort_key,
             std::string &&value,
-            pegasus_client::internal_info &&info) mutable {
+            pegasus_client::internal_info &&info,
+            uint32_t expire_ts_seconds) mutable {
             if (ret == PERR_SCAN_COMPLETE) {
                 cb();
                 return;
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 2b12273..125efbb 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -111,6 +111,7 @@ struct key_value
 {
     1:dsn.blob      key;
     2:dsn.blob      value;
+    3:optional i32  expire_ts_seconds;
 }
 
 struct multi_put_request
@@ -253,6 +254,7 @@ struct get_scanner_request
     9:filter_type  sort_key_filter_type;
     10:dsn.blob    sort_key_filter_pattern;
     11:optional bool    validate_partition_hash;
+    12:optional bool    return_expire_ts;
 }
 
 struct scan_request
diff --git a/src/include/pegasus/client.h b/src/include/pegasus/client.h
index 1b7c186..22e945c 100644
--- a/src/include/pegasus/client.h
+++ b/src/include/pegasus/client.h
@@ -251,6 +251,7 @@ public:
         filter_type sort_key_filter_type;
         std::string sort_key_filter_pattern;
         bool no_value; // only fetch hash_key and sort_key, but not fetch value
+        bool return_expire_ts;
         scan_options()
             : timeout_ms(5000),
               batch_size(100),
@@ -258,7 +259,8 @@ public:
               stop_inclusive(false),
               hash_key_filter_type(FT_NO_FILTER),
               sort_key_filter_type(FT_NO_FILTER),
-              no_value(false)
+              no_value(false),
+              return_expire_ts(false)
         {
         }
         scan_options(const scan_options &o)
@@ -270,7 +272,8 @@ public:
               hash_key_filter_pattern(o.hash_key_filter_pattern),
               sort_key_filter_type(o.sort_key_filter_type),
               sort_key_filter_pattern(o.sort_key_filter_pattern),
-              no_value(o.no_value)
+              no_value(o.no_value),
+              return_expire_ts(o.return_expire_ts)
         {
         }
     };
@@ -308,7 +311,8 @@ public:
                                std::string && /*hash_key*/,
                                std::string && /*sort_key*/,
                                std::string && /*value*/,
-                               internal_info && /*info*/)>
+                               internal_info && /*info*/,
+                               uint32_t /*expire_ts_seconds*/)>
         async_scan_next_callback_t;
     typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
         async_get_scanner_callback_t;
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index a9c512e..15f1730 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -469,9 +469,10 @@ inline std::ostream &operator<<(std::ostream &out, const count_response &obj)
 
 typedef struct _key_value__isset
 {
-    _key_value__isset() : key(false), value(false) {}
+    _key_value__isset() : key(false), value(false), expire_ts_seconds(false) {}
     bool key : 1;
     bool value : 1;
+    bool expire_ts_seconds : 1;
 } _key_value__isset;
 
 class key_value
@@ -481,11 +482,12 @@ public:
     key_value(key_value &&);
     key_value &operator=(const key_value &);
     key_value &operator=(key_value &&);
-    key_value() {}
+    key_value() : expire_ts_seconds(0) {}
 
     virtual ~key_value() throw();
     ::dsn::blob key;
     ::dsn::blob value;
+    int32_t expire_ts_seconds;
 
     _key_value__isset __isset;
 
@@ -493,12 +495,18 @@ public:
 
     void __set_value(const ::dsn::blob &val);
 
+    void __set_expire_ts_seconds(const int32_t val);
+
     bool operator==(const key_value &rhs) const
     {
         if (!(key == rhs.key))
             return false;
         if (!(value == rhs.value))
             return false;
+        if (__isset.expire_ts_seconds != rhs.__isset.expire_ts_seconds)
+            return false;
+        else if (__isset.expire_ts_seconds && !(expire_ts_seconds == rhs.expire_ts_seconds))
+            return false;
         return true;
     }
     bool operator!=(const key_value &rhs) const { return !(*this == rhs); }
@@ -1559,7 +1567,8 @@ typedef struct _get_scanner_request__isset
           hash_key_filter_pattern(false),
           sort_key_filter_type(false),
           sort_key_filter_pattern(false),
-          validate_partition_hash(false)
+          validate_partition_hash(false),
+          return_expire_ts(false)
     {
     }
     bool start_key : 1;
@@ -1573,6 +1582,7 @@ typedef struct _get_scanner_request__isset
     bool sort_key_filter_type : 1;
     bool sort_key_filter_pattern : 1;
     bool validate_partition_hash : 1;
+    bool return_expire_ts : 1;
 } _get_scanner_request__isset;
 
 class get_scanner_request
@@ -1589,7 +1599,8 @@ public:
           no_value(0),
           hash_key_filter_type((filter_type::type)0),
           sort_key_filter_type((filter_type::type)0),
-          validate_partition_hash(0)
+          validate_partition_hash(0),
+          return_expire_ts(0)
     {
     }
 
@@ -1605,6 +1616,7 @@ public:
     filter_type::type sort_key_filter_type;
     ::dsn::blob sort_key_filter_pattern;
     bool validate_partition_hash;
+    bool return_expire_ts;
 
     _get_scanner_request__isset __isset;
 
@@ -1630,6 +1642,8 @@ public:
 
     void __set_validate_partition_hash(const bool val);
 
+    void __set_return_expire_ts(const bool val);
+
     bool operator==(const get_scanner_request &rhs) const
     {
         if (!(start_key == rhs.start_key))
@@ -1657,6 +1671,10 @@ public:
         else if (__isset.validate_partition_hash &&
                  !(validate_partition_hash == rhs.validate_partition_hash))
             return false;
+        if (__isset.return_expire_ts != rhs.__isset.return_expire_ts)
+            return false;
+        else if (__isset.return_expire_ts && !(return_expire_ts == rhs.return_expire_ts))
+            return false;
         return true;
     }
     bool operator!=(const get_scanner_request &rhs) const { return !(*this == rhs); }
diff --git a/src/server/pegasus_scan_context.h b/src/server/pegasus_scan_context.h
index da3cd08..458288b 100644
--- a/src/server/pegasus_scan_context.h
+++ b/src/server/pegasus_scan_context.h
@@ -42,7 +42,8 @@ struct pegasus_scan_context
                          const std::string &&sort_key_filter_pattern_,
                          int32_t batch_size_,
                          bool no_value_,
-                         bool validate_partition_hash_)
+                         bool validate_partition_hash_,
+                         bool return_expire_ts_)
         : _stop_holder(std::move(stop_)),
           _hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
           _sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
@@ -57,7 +58,8 @@ struct pegasus_scan_context
               _sort_key_filter_pattern_holder.data(), 0, _sort_key_filter_pattern_holder.length()),
           batch_size(batch_size_),
           no_value(no_value_),
-          validate_partition_hash(validate_partition_hash_)
+          validate_partition_hash(validate_partition_hash_),
+          return_expire_ts(return_expire_ts_)
     {
     }
 
@@ -77,6 +79,7 @@ public:
     int32_t batch_size;
     bool no_value;
     bool validate_partition_hash;
+    bool return_expire_ts;
 };
 
 class pegasus_context_cache
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 81134b4..f759ad8 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1009,6 +1009,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
     uint32_t batch_count = std::min(request_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);
     resp.kvs.reserve(batch_count);
 
+    bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;
+
     std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
         batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
 
@@ -1041,7 +1043,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
             request.sort_key_filter_pattern,
             epoch_now,
             request.no_value,
-            request.__isset.validate_partition_hash ? request.validate_partition_hash : true);
+            request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
+            return_expire_ts);
         switch (state) {
         case range_iteration_state::kNormal:
             count++;
@@ -1116,7 +1119,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
                         request.sort_key_filter_pattern.length()),
             batch_count,
             request.no_value,
-            request.__isset.validate_partition_hash ? request.validate_partition_hash : true));
+            request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
+            return_expire_ts));
         int64_t handle = _context_cache.put(std::move(context));
         resp.context_id = handle;
         // if the context is used, it will be fetched and re-put into cache,
@@ -1166,6 +1170,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
         const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
         bool no_value = context->no_value;
         bool validate_hash = context->validate_partition_hash;
+        bool return_expire_ts = context->return_expire_ts;
         bool complete = false;
         uint32_t epoch_now = ::pegasus::utils::epoch_now();
         uint64_t expire_count = 0;
@@ -1198,7 +1203,8 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
                                                    sort_key_filter_pattern,
                                                    epoch_now,
                                                    no_value,
-                                                   validate_hash);
+                                                   validate_hash,
+                                                   return_expire_ts);
             switch (state) {
             case range_iteration_state::kNormal:
                 count++;
@@ -2089,7 +2095,8 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
                                                const ::dsn::blob &sort_key_filter_pattern,
                                                uint32_t epoch_now,
                                                bool no_value,
-                                               bool request_validate_hash)
+                                               bool request_validate_hash,
+                                               bool request_expire_ts)
 {
     if (check_if_record_expired(epoch_now, value)) {
         if (_verbose_log) {
@@ -2135,6 +2142,13 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
     ::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
     kv.key.assign(std::move(key_buf), 0, raw_key.length());
 
+    // extract expire ts if necessary
+    if (request_expire_ts) {
+        auto expire_ts_seconds =
+            pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
+        kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
+    }
+
     // extract value
     if (!no_value) {
         std::string value_buf(value.data(), value.size());
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 7d23fb8..da8047a 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -222,7 +222,8 @@ private:
                               const ::dsn::blob &sort_key_filter_pattern,
                               uint32_t epoch_now,
                               bool no_value,
-                              bool request_validate_hash);
+                              bool request_validate_hash,
+                              bool request_expire_ts);
 
     range_iteration_state
     append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index be2f3ca..c5178c0 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -260,6 +260,17 @@ validate_filter(scan_data_context *context, const std::string &sort_key, const s
         return false;
     return validate_filter(context->value_filter_type, context->value_filter_pattern, value);
 }
+
+inline int compute_ttl_seconds(uint32_t expire_ts_seconds, bool &ts_expired)
+{
+    auto epoch_now = pegasus::utils::epoch_now();
+    ts_expired = pegasus::check_if_ts_expired(epoch_now, expire_ts_seconds);
+    if (expire_ts_seconds > 0 && !ts_expired) {
+        return static_cast<int>(expire_ts_seconds - epoch_now);
+    }
+    return 0;
+}
+
 inline void scan_data_next(scan_data_context *context)
 {
     while (!context->split_completed.load() && !context->error_occurred->load() &&
@@ -269,13 +280,19 @@ inline void scan_data_next(scan_data_context *context)
                                                std::string &&hash_key,
                                                std::string &&sort_key,
                                                std::string &&value,
-                                               pegasus::pegasus_client::internal_info &&info) {
+                                               pegasus::pegasus_client::internal_info &&info,
+                                               uint32_t expire_ts_seconds) {
             if (ret == pegasus::PERR_OK) {
                 if (validate_filter(context, sort_key, value)) {
+                    bool ts_expired = false;
+                    int ttl_seconds = 0;
                     switch (context->op) {
                     case SCAN_COPY:
                         context->split_request_count++;
-                        if (context->no_overwrite) {
+                        ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
+                        if (ts_expired) {
+                            scan_data_next(context);
+                        } else if (context->no_overwrite) {
                             auto callback = [context](
                                 int err,
                                 pegasus::pegasus_client::check_and_set_results &&results,
@@ -299,6 +316,7 @@ inline void scan_data_next(scan_data_context *context)
                                 context->split_request_count--;
                             };
                             pegasus::pegasus_client::check_and_set_options options;
+                            options.set_value_ttl_seconds = ttl_seconds;
                             context->client->async_check_and_set(
                                 hash_key,
                                 sort_key,
@@ -332,7 +350,8 @@ inline void scan_data_next(scan_data_context *context)
                                                        sort_key,
                                                        value,
                                                        std::move(callback),
-                                                       context->timeout_ms);
+                                                       context->timeout_ms,
+                                                       ttl_seconds);
                         }
                         break;
                     case SCAN_CLEAR:
@@ -395,28 +414,34 @@ inline void scan_data_next(scan_data_context *context)
                         break;
                     case SCAN_GEN_GEO:
                         context->split_request_count++;
-                        context->geoclient->async_set(
-                            hash_key,
-                            sort_key,
-                            value,
-                            [context](int err, pegasus::pegasus_client::internal_info &&info) {
-                                if (err != pegasus::PERR_OK) {
-                                    if (!context->split_completed.exchange(true)) {
-                                        fprintf(stderr,
-                                                "ERROR: split[%d] async set failed: %s\n",
-                                                context->split_id,
-                                                context->client->get_error_string(err));
-                                        context->error_occurred->store(true);
+                        ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
+                        if (ts_expired) {
+                            scan_data_next(context);
+                        } else {
+                            context->geoclient->async_set(
+                                hash_key,
+                                sort_key,
+                                value,
+                                [context](int err, pegasus::pegasus_client::internal_info &&info) {
+                                    if (err != pegasus::PERR_OK) {
+                                        if (!context->split_completed.exchange(true)) {
+                                            fprintf(stderr,
+                                                    "ERROR: split[%d] async set failed: %s\n",
+                                                    context->split_id,
+                                                    context->client->get_error_string(err));
+                                            context->error_occurred->store(true);
+                                        }
+                                    } else {
+                                        context->split_rows++;
+                                        scan_data_next(context);
                                     }
-                                } else {
-                                    context->split_rows++;
-                                    scan_data_next(context);
-                                }
-                                // should put "split_request_count--" at end of the scope,
-                                // to prevent that split_request_count becomes 0 in the middle.
-                                context->split_request_count--;
-                            },
-                            context->timeout_ms);
+                                    // should put "split_request_count--" at end of the scope,
+                                    // to prevent that split_request_count becomes 0 in the middle.
+                                    context->split_request_count--;
+                                },
+                                context->timeout_ms,
+                                ttl_seconds);
+                        }
                         break;
                     default:
                         dassert(false, "op = %d", context->op);
diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp
index 28c53fa..44eabbb 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -1540,6 +1540,7 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
                                            {"no_overwrite", no_argument, 0, 'n'},
                                            {"no_value", no_argument, 0, 'i'},
                                            {"geo_data", no_argument, 0, 'g'},
+                                           {"no_ttl", no_argument, 0, 'e'},
                                            {0, 0, 0, 0}};
 
     std::string target_cluster_name;
@@ -1559,13 +1560,14 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
     pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
     std::string value_filter_pattern;
     pegasus::pegasus_client::scan_options options;
+    options.return_expire_ts = true;
 
     optind = 0;
     while (true) {
         int option_index = 0;
         int c;
         c = getopt_long(
-            args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nig", long_options, &option_index);
+            args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nige", long_options, &option_index);
         if (c == -1)
             break;
         switch (c) {
@@ -1640,6 +1642,9 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
         case 'g':
             is_geo_data = true;
             break;
+        case 'e':
+            options.return_expire_ts = false;
+            break;
         default:
             return false;
         }
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 43c2eb8..1b05ef1 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -276,7 +276,8 @@ static command_executor commands[] = {
         "[-y|--sort_key_filter_pattern str] "
         "[-v|--value_filter_type anywhere|prefix|postfix|exact] "
         "[-z|--value_filter_pattern str] "
-        "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data]",
+        "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data] "
+        "[-e|--no_ttl]",
         data_operations,
     },
     {
diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp
index ca8a5f1..c34cdc9 100644
--- a/src/test/function_test/test_scan.cpp
+++ b/src/test/function_test/test_scan.cpp
@@ -28,6 +28,7 @@
 #include <pegasus/client.h>
 #include <gtest/gtest.h>
 #include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
 
 using namespace ::pegasus;
 
@@ -37,6 +38,8 @@ static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOP
 static char buffer[256];
 static std::map<std::string, std::map<std::string, std::string>> base;
 static std::string expected_hash_key;
+static constexpr int ttl_seconds = 24 * 60 * 60;
+static std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> ttl_base;
 
 // REQUIRED: 'buffer' has been filled with random chars.
 static const std::string random_string()
@@ -52,6 +55,25 @@ static const std::string random_string()
     }
 }
 
+static void
+check_and_put(std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> &data,
+              const std::string &hash_key,
+              const std::string &sort_key,
+              const std::string &value,
+              uint32_t expire_ts_seconds)
+{
+    auto it1 = data.find(hash_key);
+    if (it1 != data.end()) {
+        auto it2 = it1->second.find(sort_key);
+        ASSERT_EQ(it1->second.end(), it2)
+            << "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
+            << ", old_value=" << it2->second.first << ", new_value=" << value
+            << ", old_expire_ts_seconds=" << it2->second.second
+            << ", new_expire_ts_seconds=" << expire_ts_seconds;
+    }
+    data[hash_key][sort_key] = std::pair<std::string, uint32_t>(value, expire_ts_seconds);
+}
+
 static void check_and_put(std::map<std::string, std::map<std::string, std::string>> &data,
                           const std::string &hash_key,
                           const std::string &sort_key,
@@ -78,6 +100,48 @@ static void check_and_put(std::map<std::string, std::string> &data,
     data[sort_key] = value;
 }
 
+static void compare(const std::pair<std::string, uint32_t> &data,
+                    const std::pair<std::string, uint32_t> &base,
+                    const std::string &hash_key,
+                    const std::string sort_key)
+{
+    ASSERT_EQ(base.first, data.first)
+        << "Diff value: hash_key=" << hash_key << ", sort_key=" << sort_key
+        << ", data_value=" << data.first << ", data_expire_ts_seconds=" << data.second
+        << ", base_value=" << base.first << ", base_expire_ts_seconds=" << base.second;
+
+    ASSERT_TRUE(data.second >= base.second && data.second - base.second <= 1)
+        << "Diff expire_ts_seconds: hash_key=" << hash_key << ", sort_key=" << sort_key
+        << ", data_value=" << data.first << ", data_expire_ts_seconds=" << data.second
+        << ", base_value=" << base.first << ", base_expire_ts_seconds=" << base.second;
+}
+
+static void compare(const std::map<std::string, std::pair<std::string, uint32_t>> &data,
+                    const std::map<std::string, std::pair<std::string, uint32_t>> &base,
+                    const std::string &hash_key)
+{
+    for (auto it1 = data.begin(), it2 = base.begin();; ++it1, ++it2) {
+        if (it1 == data.end()) {
+            ASSERT_EQ(base.end(), it2)
+                << "Only in base: hash_key=" << hash_key << ", sort_key=" << it2->first
+                << ", value=" << it2->second.first << ", expire_ts_seconds=" << it2->second.second;
+            break;
+        }
+        ASSERT_NE(base.end(), it2) << "Only in data: hash_key=" << hash_key
+                                   << ", sort_key=" << it1->first << ", value=" << it1->second.first
+                                   << ", expire_ts_seconds=" << it1->second.second;
+        ASSERT_EQ(it2->first, it1->first)
+            << "Diff sort_key: hash_key=" << hash_key << ", data_sort_key=" << it1->first
+            << ", data_value=" << it1->second.first
+            << ", data_expire_ts_seconds=" << it1->second.second << ", base_sort_key=" << it2->first
+            << ", base_value=" << it2->second.first
+            << ", base_expire_ts_seconds=" << it2->second.second;
+        compare(it1->second, it2->second, hash_key, it1->first);
+    }
+
+    dinfo("Data and base are the same.");
+}
+
 static void compare(const std::map<std::string, std::string> &data,
                     const std::map<std::string, std::string> &base,
                     const std::string &hash_key)
@@ -98,8 +162,8 @@ static void compare(const std::map<std::string, std::string> &data,
     dinfo("Data and base are the same.");
 }
 
-static void compare(std::map<std::string, std::map<std::string, std::string>> &data,
-                    std::map<std::string, std::map<std::string, std::string>> &base)
+template <typename T, typename U>
+static void compare(const T &data, const U &base)
 {
     for (auto it1 = data.begin(), it2 = base.begin();; ++it1, ++it2) {
         if (it1 == data.end()) {
@@ -179,7 +243,7 @@ static void fill_database()
         base[expected_hash_key][sort_key] = value;
     }
 
-    while (base.size() < 1000) {
+    while (base.size() < 500) {
         hash_key = random_string();
         while (base[hash_key].size() < 10) {
             sort_key = random_string();
@@ -192,6 +256,22 @@ static void fill_database()
         }
     }
 
+    while (base.size() < 1000) {
+        hash_key = random_string();
+        while (base[hash_key].size() < 10) {
+            sort_key = random_string();
+            value = random_string();
+            auto expire_ts_seconds = static_cast<uint32_t>(ttl_seconds) + utils::epoch_now();
+            int ret = client->set(hash_key, sort_key, value, 5000, ttl_seconds, nullptr);
+            ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+                                    << ", sort_key=" << sort_key
+                                    << ", error=" << client->get_error_string(ret);
+            base[hash_key][sort_key] = value;
+            ttl_base[hash_key][sort_key] =
+                std::pair<std::string, uint32_t>(value, expire_ts_seconds);
+        }
+    }
+
     ddebug("Database filled.");
 }
 
@@ -416,6 +496,58 @@ TEST_F(scan, OVERALL)
     compare(data, base);
 }
 
+TEST_F(scan, REQUEST_EXPIRE_TS)
+{
+    ddebug("TEST REQUEST_EXPIRE_TS...");
+
+    pegasus_client::scan_options options;
+    options.return_expire_ts = true;
+    std::vector<pegasus_client::pegasus_scanner *> raw_scanners;
+    int ret = client->get_unordered_scanners(3, options, raw_scanners);
+    ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+                                     << client->get_error_string(ret);
+
+    std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+    for (auto raw_scanner : raw_scanners) {
+        ASSERT_NE(nullptr, raw_scanner);
+        scanners.push_back(raw_scanner->get_smart_wrapper());
+    }
+    raw_scanners.clear();
+    ASSERT_LE(scanners.size(), 3);
+
+    std::map<std::string, std::map<std::string, std::string>> data;
+    std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> ttl_data;
+    for (auto scanner : scanners) {
+        std::atomic_bool split_completed(false);
+        while (!split_completed.load()) {
+            dsn::utils::notify_event op_completed;
+            scanner->async_next([&](int err,
+                                    std::string &&hash_key,
+                                    std::string &&sort_key,
+                                    std::string &&value,
+                                    pegasus::pegasus_client::internal_info &&info,
+                                    uint32_t expire_ts_seconds) {
+                if (err == pegasus::PERR_OK) {
+                    check_and_put(data, hash_key, sort_key, value);
+                    if (expire_ts_seconds > 0) {
+                        check_and_put(ttl_data, hash_key, sort_key, value, expire_ts_seconds);
+                    }
+                } else if (err == pegasus::PERR_SCAN_COMPLETE) {
+                    split_completed.store(true);
+                } else {
+                    ASSERT_TRUE(false) << "Error occurred when scan. error="
+                                       << client->get_error_string(err);
+                }
+                op_completed.notify();
+            });
+            op_completed.wait();
+        }
+    }
+
+    compare(data, base);
+    compare(ttl_data, ttl_base);
+}
+
 TEST_F(scan, ITERATION_TIME_LIMIT)
 {
     // update iteration threshold to 1ms

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org