You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/07/06 08:50:53 UTC
[incubator-pegasus] branch master updated: feat: support preserving
TTL for copy_data (#752)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d5128fa feat: support preserving TTL for copy_data (#752)
d5128fa is described below
commit d5128fa5e0813aa73a68f6df02100efdb025721d
Author: Dan Wang <em...@126.com>
AuthorDate: Tue Jul 6 16:50:43 2021 +0800
feat: support preserving TTL for copy_data (#752)
---
src/base/rrdb_types.cpp | 54 +++++++++++++
src/client_lib/pegasus_scanner_impl.cpp | 28 +++++--
src/geo/lib/geo_client.cpp | 3 +-
src/idl/rrdb.thrift | 2 +
src/include/pegasus/client.h | 10 ++-
src/include/rrdb/rrdb_types.h | 26 +++++-
src/server/pegasus_scan_context.h | 7 +-
src/server/pegasus_server_impl.cpp | 22 ++++-
src/server/pegasus_server_impl.h | 3 +-
src/shell/command_helper.h | 73 +++++++++++------
src/shell/commands/data_operations.cpp | 7 +-
src/shell/main.cpp | 3 +-
src/test/function_test/test_scan.cpp | 138 +++++++++++++++++++++++++++++++-
13 files changed, 324 insertions(+), 52 deletions(-)
diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 42c6778..5e0885e 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -929,6 +929,12 @@ void key_value::__set_key(const ::dsn::blob &val) { this->key = val; }
void key_value::__set_value(const ::dsn::blob &val) { this->value = val; }
+void key_value::__set_expire_ts_seconds(const int32_t val)
+{
+ this->expire_ts_seconds = val;
+ __isset.expire_ts_seconds = true;
+}
+
uint32_t key_value::read(::apache::thrift::protocol::TProtocol *iprot)
{
@@ -964,6 +970,14 @@ uint32_t key_value::read(::apache::thrift::protocol::TProtocol *iprot)
xfer += iprot->skip(ftype);
}
break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ xfer += iprot->readI32(this->expire_ts_seconds);
+ this->__isset.expire_ts_seconds = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -990,6 +1004,11 @@ uint32_t key_value::write(::apache::thrift::protocol::TProtocol *oprot) const
xfer += this->value.write(oprot);
xfer += oprot->writeFieldEnd();
+ if (this->__isset.expire_ts_seconds) {
+ xfer += oprot->writeFieldBegin("expire_ts_seconds", ::apache::thrift::protocol::T_I32, 3);
+ xfer += oprot->writeI32(this->expire_ts_seconds);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -1000,6 +1019,7 @@ void swap(key_value &a, key_value &b)
using ::std::swap;
swap(a.key, b.key);
swap(a.value, b.value);
+ swap(a.expire_ts_seconds, b.expire_ts_seconds);
swap(a.__isset, b.__isset);
}
@@ -1007,18 +1027,21 @@ key_value::key_value(const key_value &other20)
{
key = other20.key;
value = other20.value;
+ expire_ts_seconds = other20.expire_ts_seconds;
__isset = other20.__isset;
}
key_value::key_value(key_value &&other21)
{
key = std::move(other21.key);
value = std::move(other21.value);
+ expire_ts_seconds = std::move(other21.expire_ts_seconds);
__isset = std::move(other21.__isset);
}
key_value &key_value::operator=(const key_value &other22)
{
key = other22.key;
value = other22.value;
+ expire_ts_seconds = other22.expire_ts_seconds;
__isset = other22.__isset;
return *this;
}
@@ -1026,6 +1049,7 @@ key_value &key_value::operator=(key_value &&other23)
{
key = std::move(other23.key);
value = std::move(other23.value);
+ expire_ts_seconds = std::move(other23.expire_ts_seconds);
__isset = std::move(other23.__isset);
return *this;
}
@@ -1036,6 +1060,9 @@ void key_value::printTo(std::ostream &out) const
out << "key=" << to_string(key);
out << ", "
<< "value=" << to_string(value);
+ out << ", "
+ << "expire_ts_seconds=";
+ (__isset.expire_ts_seconds ? (out << to_string(expire_ts_seconds)) : (out << "<null>"));
out << ")";
}
@@ -3679,6 +3706,12 @@ void get_scanner_request::__set_validate_partition_hash(const bool val)
__isset.validate_partition_hash = true;
}
+void get_scanner_request::__set_return_expire_ts(const bool val)
+{
+ this->return_expire_ts = val;
+ __isset.return_expire_ts = true;
+}
+
uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
{
@@ -3790,6 +3823,14 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
xfer += iprot->skip(ftype);
}
break;
+ case 12:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->return_expire_ts);
+ this->__isset.return_expire_ts = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -3856,6 +3897,11 @@ uint32_t get_scanner_request::write(::apache::thrift::protocol::TProtocol *oprot
xfer += oprot->writeBool(this->validate_partition_hash);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.return_expire_ts) {
+ xfer += oprot->writeFieldBegin("return_expire_ts", ::apache::thrift::protocol::T_BOOL, 12);
+ xfer += oprot->writeBool(this->return_expire_ts);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -3875,6 +3921,7 @@ void swap(get_scanner_request &a, get_scanner_request &b)
swap(a.sort_key_filter_type, b.sort_key_filter_type);
swap(a.sort_key_filter_pattern, b.sort_key_filter_pattern);
swap(a.validate_partition_hash, b.validate_partition_hash);
+ swap(a.return_expire_ts, b.return_expire_ts);
swap(a.__isset, b.__isset);
}
@@ -3891,6 +3938,7 @@ get_scanner_request::get_scanner_request(const get_scanner_request &other108)
sort_key_filter_type = other108.sort_key_filter_type;
sort_key_filter_pattern = other108.sort_key_filter_pattern;
validate_partition_hash = other108.validate_partition_hash;
+ return_expire_ts = other108.return_expire_ts;
__isset = other108.__isset;
}
get_scanner_request::get_scanner_request(get_scanner_request &&other109)
@@ -3906,6 +3954,7 @@ get_scanner_request::get_scanner_request(get_scanner_request &&other109)
sort_key_filter_type = std::move(other109.sort_key_filter_type);
sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern);
validate_partition_hash = std::move(other109.validate_partition_hash);
+ return_expire_ts = std::move(other109.return_expire_ts);
__isset = std::move(other109.__isset);
}
get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110)
@@ -3921,6 +3970,7 @@ get_scanner_request &get_scanner_request::operator=(const get_scanner_request &o
sort_key_filter_type = other110.sort_key_filter_type;
sort_key_filter_pattern = other110.sort_key_filter_pattern;
validate_partition_hash = other110.validate_partition_hash;
+ return_expire_ts = other110.return_expire_ts;
__isset = other110.__isset;
return *this;
}
@@ -3937,6 +3987,7 @@ get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other1
sort_key_filter_type = std::move(other111.sort_key_filter_type);
sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern);
validate_partition_hash = std::move(other111.validate_partition_hash);
+ return_expire_ts = std::move(other111.return_expire_ts);
__isset = std::move(other111.__isset);
return *this;
}
@@ -3967,6 +4018,9 @@ void get_scanner_request::printTo(std::ostream &out) const
<< "validate_partition_hash=";
(__isset.validate_partition_hash ? (out << to_string(validate_partition_hash))
: (out << "<null>"));
+ out << ", "
+ << "return_expire_ts=";
+ (__isset.return_expire_ts ? (out << to_string(return_expire_ts)) : (out << "<null>"));
out << ")";
}
diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp
index d264c3a..487bdd5 100644
--- a/src/client_lib/pegasus_scanner_impl.cpp
+++ b/src/client_lib/pegasus_scanner_impl.cpp
@@ -61,12 +61,16 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
{
::dsn::utils::notify_event op_completed;
int ret = -1;
- auto callback = [&](
- int err, std::string &&hash, std::string &&sort, std::string &&str, internal_info &&ii) {
+ auto callback = [&](int err,
+ std::string &&hash,
+ std::string &&sort,
+ std::string &&val,
+ internal_info &&ii,
+ uint32_t expire_ts_seconds) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
- value = std::move(str);
+ value = std::move(val);
if (info) {
(*info) = std::move(ii);
}
@@ -130,7 +134,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string(),
std::string(),
std::string(),
- std::move(info));
+ std::move(info),
+ 0);
}
}
return;
@@ -156,6 +161,9 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string hash_key, sort_key;
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
+ uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds
+ ? static_cast<uint32_t>(_kvs[_p].expire_ts_seconds)
+ : 0;
auto &callback = _queue.front();
if (callback) {
@@ -165,7 +173,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(hash_key),
std::move(sort_key),
std::move(value),
- std::move(info));
+ std::move(info),
+ expire_ts_seconds);
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
@@ -215,6 +224,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
_options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size());
req.no_value = _options.no_value;
req.__set_validate_partition_hash(_validate_partition_hash);
+ req.__set_return_expire_ts(_options.return_expire_ts);
dassert(!_rpc_started, "");
_rpc_started = true;
@@ -273,7 +283,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
for (auto &callback : temp) {
if (callback) {
- callback(ret, std::string(), std::string(), std::string(), internal_info(info));
+ callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0);
}
}
}
@@ -307,12 +317,14 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
std::string &&hash_key,
std::string &&sort_key,
std::string &&value,
- internal_info &&info) {
+ internal_info &&info,
+ uint32_t expire_ts_seconds) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
- std::move(info));
+ std::move(info),
+ expire_ts_seconds);
});
}
diff --git a/src/geo/lib/geo_client.cpp b/src/geo/lib/geo_client.cpp
index cac7b78..e8b60be 100644
--- a/src/geo/lib/geo_client.cpp
+++ b/src/geo/lib/geo_client.cpp
@@ -861,7 +861,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
std::string &&geo_hash_key,
std::string &&geo_sort_key,
std::string &&value,
- pegasus_client::internal_info &&info) mutable {
+ pegasus_client::internal_info &&info,
+ uint32_t expire_ts_seconds) mutable {
if (ret == PERR_SCAN_COMPLETE) {
cb();
return;
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 2b12273..125efbb 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -111,6 +111,7 @@ struct key_value
{
1:dsn.blob key;
2:dsn.blob value;
+ 3:optional i32 expire_ts_seconds;
}
struct multi_put_request
@@ -253,6 +254,7 @@ struct get_scanner_request
9:filter_type sort_key_filter_type;
10:dsn.blob sort_key_filter_pattern;
11:optional bool validate_partition_hash;
+ 12:optional bool return_expire_ts;
}
struct scan_request
diff --git a/src/include/pegasus/client.h b/src/include/pegasus/client.h
index 1b7c186..22e945c 100644
--- a/src/include/pegasus/client.h
+++ b/src/include/pegasus/client.h
@@ -251,6 +251,7 @@ public:
filter_type sort_key_filter_type;
std::string sort_key_filter_pattern;
bool no_value; // only fetch hash_key and sort_key, but not fetch value
+ bool return_expire_ts;
scan_options()
: timeout_ms(5000),
batch_size(100),
@@ -258,7 +259,8 @@ public:
stop_inclusive(false),
hash_key_filter_type(FT_NO_FILTER),
sort_key_filter_type(FT_NO_FILTER),
- no_value(false)
+ no_value(false),
+ return_expire_ts(false)
{
}
scan_options(const scan_options &o)
@@ -270,7 +272,8 @@ public:
hash_key_filter_pattern(o.hash_key_filter_pattern),
sort_key_filter_type(o.sort_key_filter_type),
sort_key_filter_pattern(o.sort_key_filter_pattern),
- no_value(o.no_value)
+ no_value(o.no_value),
+ return_expire_ts(o.return_expire_ts)
{
}
};
@@ -308,7 +311,8 @@ public:
std::string && /*hash_key*/,
std::string && /*sort_key*/,
std::string && /*value*/,
- internal_info && /*info*/)>
+ internal_info && /*info*/,
+ uint32_t /*expire_ts_seconds*/)>
async_scan_next_callback_t;
typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
async_get_scanner_callback_t;
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index a9c512e..15f1730 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -469,9 +469,10 @@ inline std::ostream &operator<<(std::ostream &out, const count_response &obj)
typedef struct _key_value__isset
{
- _key_value__isset() : key(false), value(false) {}
+ _key_value__isset() : key(false), value(false), expire_ts_seconds(false) {}
bool key : 1;
bool value : 1;
+ bool expire_ts_seconds : 1;
} _key_value__isset;
class key_value
@@ -481,11 +482,12 @@ public:
key_value(key_value &&);
key_value &operator=(const key_value &);
key_value &operator=(key_value &&);
- key_value() {}
+ key_value() : expire_ts_seconds(0) {}
virtual ~key_value() throw();
::dsn::blob key;
::dsn::blob value;
+ int32_t expire_ts_seconds;
_key_value__isset __isset;
@@ -493,12 +495,18 @@ public:
void __set_value(const ::dsn::blob &val);
+ void __set_expire_ts_seconds(const int32_t val);
+
bool operator==(const key_value &rhs) const
{
if (!(key == rhs.key))
return false;
if (!(value == rhs.value))
return false;
+ if (__isset.expire_ts_seconds != rhs.__isset.expire_ts_seconds)
+ return false;
+ else if (__isset.expire_ts_seconds && !(expire_ts_seconds == rhs.expire_ts_seconds))
+ return false;
return true;
}
bool operator!=(const key_value &rhs) const { return !(*this == rhs); }
@@ -1559,7 +1567,8 @@ typedef struct _get_scanner_request__isset
hash_key_filter_pattern(false),
sort_key_filter_type(false),
sort_key_filter_pattern(false),
- validate_partition_hash(false)
+ validate_partition_hash(false),
+ return_expire_ts(false)
{
}
bool start_key : 1;
@@ -1573,6 +1582,7 @@ typedef struct _get_scanner_request__isset
bool sort_key_filter_type : 1;
bool sort_key_filter_pattern : 1;
bool validate_partition_hash : 1;
+ bool return_expire_ts : 1;
} _get_scanner_request__isset;
class get_scanner_request
@@ -1589,7 +1599,8 @@ public:
no_value(0),
hash_key_filter_type((filter_type::type)0),
sort_key_filter_type((filter_type::type)0),
- validate_partition_hash(0)
+ validate_partition_hash(0),
+ return_expire_ts(0)
{
}
@@ -1605,6 +1616,7 @@ public:
filter_type::type sort_key_filter_type;
::dsn::blob sort_key_filter_pattern;
bool validate_partition_hash;
+ bool return_expire_ts;
_get_scanner_request__isset __isset;
@@ -1630,6 +1642,8 @@ public:
void __set_validate_partition_hash(const bool val);
+ void __set_return_expire_ts(const bool val);
+
bool operator==(const get_scanner_request &rhs) const
{
if (!(start_key == rhs.start_key))
@@ -1657,6 +1671,10 @@ public:
else if (__isset.validate_partition_hash &&
!(validate_partition_hash == rhs.validate_partition_hash))
return false;
+ if (__isset.return_expire_ts != rhs.__isset.return_expire_ts)
+ return false;
+ else if (__isset.return_expire_ts && !(return_expire_ts == rhs.return_expire_ts))
+ return false;
return true;
}
bool operator!=(const get_scanner_request &rhs) const { return !(*this == rhs); }
diff --git a/src/server/pegasus_scan_context.h b/src/server/pegasus_scan_context.h
index da3cd08..458288b 100644
--- a/src/server/pegasus_scan_context.h
+++ b/src/server/pegasus_scan_context.h
@@ -42,7 +42,8 @@ struct pegasus_scan_context
const std::string &&sort_key_filter_pattern_,
int32_t batch_size_,
bool no_value_,
- bool validate_partition_hash_)
+ bool validate_partition_hash_,
+ bool return_expire_ts_)
: _stop_holder(std::move(stop_)),
_hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
_sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
@@ -57,7 +58,8 @@ struct pegasus_scan_context
_sort_key_filter_pattern_holder.data(), 0, _sort_key_filter_pattern_holder.length()),
batch_size(batch_size_),
no_value(no_value_),
- validate_partition_hash(validate_partition_hash_)
+ validate_partition_hash(validate_partition_hash_),
+ return_expire_ts(return_expire_ts_)
{
}
@@ -77,6 +79,7 @@ public:
int32_t batch_size;
bool no_value;
bool validate_partition_hash;
+ bool return_expire_ts;
};
class pegasus_context_cache
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 81134b4..f759ad8 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1009,6 +1009,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
uint32_t batch_count = std::min(request_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);
resp.kvs.reserve(batch_count);
+ bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;
+
std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
@@ -1041,7 +1043,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.sort_key_filter_pattern,
epoch_now,
request.no_value,
- request.__isset.validate_partition_hash ? request.validate_partition_hash : true);
+ request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
+ return_expire_ts);
switch (state) {
case range_iteration_state::kNormal:
count++;
@@ -1116,7 +1119,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.sort_key_filter_pattern.length()),
batch_count,
request.no_value,
- request.__isset.validate_partition_hash ? request.validate_partition_hash : true));
+ request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
+ return_expire_ts));
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
// if the context is used, it will be fetched and re-put into cache,
@@ -1166,6 +1170,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
bool no_value = context->no_value;
bool validate_hash = context->validate_partition_hash;
+ bool return_expire_ts = context->return_expire_ts;
bool complete = false;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;
@@ -1198,7 +1203,8 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
sort_key_filter_pattern,
epoch_now,
no_value,
- validate_hash);
+ validate_hash,
+ return_expire_ts);
switch (state) {
case range_iteration_state::kNormal:
count++;
@@ -2089,7 +2095,8 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool no_value,
- bool request_validate_hash)
+ bool request_validate_hash,
+ bool request_expire_ts)
{
if (check_if_record_expired(epoch_now, value)) {
if (_verbose_log) {
@@ -2135,6 +2142,13 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());
+ // extract expire ts if necessary
+ if (request_expire_ts) {
+ auto expire_ts_seconds =
+ pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
+ kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
+ }
+
// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 7d23fb8..da8047a 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -222,7 +222,8 @@ private:
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool no_value,
- bool request_validate_hash);
+ bool request_validate_hash,
+ bool request_expire_ts);
range_iteration_state
append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index be2f3ca..c5178c0 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -260,6 +260,17 @@ validate_filter(scan_data_context *context, const std::string &sort_key, const s
return false;
return validate_filter(context->value_filter_type, context->value_filter_pattern, value);
}
+
+inline int compute_ttl_seconds(uint32_t expire_ts_seconds, bool &ts_expired)
+{
+ auto epoch_now = pegasus::utils::epoch_now();
+ ts_expired = pegasus::check_if_ts_expired(epoch_now, expire_ts_seconds);
+ if (expire_ts_seconds > 0 && !ts_expired) {
+ return static_cast<int>(expire_ts_seconds - epoch_now);
+ }
+ return 0;
+}
+
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
@@ -269,13 +280,19 @@ inline void scan_data_next(scan_data_context *context)
std::string &&hash_key,
std::string &&sort_key,
std::string &&value,
- pegasus::pegasus_client::internal_info &&info) {
+ pegasus::pegasus_client::internal_info &&info,
+ uint32_t expire_ts_seconds) {
if (ret == pegasus::PERR_OK) {
if (validate_filter(context, sort_key, value)) {
+ bool ts_expired = false;
+ int ttl_seconds = 0;
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
- if (context->no_overwrite) {
+ ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
+ if (ts_expired) {
+ scan_data_next(context);
+ } else if (context->no_overwrite) {
auto callback = [context](
int err,
pegasus::pegasus_client::check_and_set_results &&results,
@@ -299,6 +316,7 @@ inline void scan_data_next(scan_data_context *context)
context->split_request_count--;
};
pegasus::pegasus_client::check_and_set_options options;
+ options.set_value_ttl_seconds = ttl_seconds;
context->client->async_check_and_set(
hash_key,
sort_key,
@@ -332,7 +350,8 @@ inline void scan_data_next(scan_data_context *context)
sort_key,
value,
std::move(callback),
- context->timeout_ms);
+ context->timeout_ms,
+ ttl_seconds);
}
break;
case SCAN_CLEAR:
@@ -395,28 +414,34 @@ inline void scan_data_next(scan_data_context *context)
break;
case SCAN_GEN_GEO:
context->split_request_count++;
- context->geoclient->async_set(
- hash_key,
- sort_key,
- value,
- [context](int err, pegasus::pegasus_client::internal_info &&info) {
- if (err != pegasus::PERR_OK) {
- if (!context->split_completed.exchange(true)) {
- fprintf(stderr,
- "ERROR: split[%d] async set failed: %s\n",
- context->split_id,
- context->client->get_error_string(err));
- context->error_occurred->store(true);
+ ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
+ if (ts_expired) {
+ scan_data_next(context);
+ } else {
+ context->geoclient->async_set(
+ hash_key,
+ sort_key,
+ value,
+ [context](int err, pegasus::pegasus_client::internal_info &&info) {
+ if (err != pegasus::PERR_OK) {
+ if (!context->split_completed.exchange(true)) {
+ fprintf(stderr,
+ "ERROR: split[%d] async set failed: %s\n",
+ context->split_id,
+ context->client->get_error_string(err));
+ context->error_occurred->store(true);
+ }
+ } else {
+ context->split_rows++;
+ scan_data_next(context);
}
- } else {
- context->split_rows++;
- scan_data_next(context);
- }
- // should put "split_request_count--" at end of the scope,
- // to prevent that split_request_count becomes 0 in the middle.
- context->split_request_count--;
- },
- context->timeout_ms);
+ // should put "split_request_count--" at end of the scope,
+ // to prevent that split_request_count becomes 0 in the middle.
+ context->split_request_count--;
+ },
+ context->timeout_ms,
+ ttl_seconds);
+ }
break;
default:
dassert(false, "op = %d", context->op);
diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp
index 28c53fa..44eabbb 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -1540,6 +1540,7 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
{"no_overwrite", no_argument, 0, 'n'},
{"no_value", no_argument, 0, 'i'},
{"geo_data", no_argument, 0, 'g'},
+ {"no_ttl", no_argument, 0, 'e'},
{0, 0, 0, 0}};
std::string target_cluster_name;
@@ -1559,13 +1560,14 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
std::string value_filter_pattern;
pegasus::pegasus_client::scan_options options;
+ options.return_expire_ts = true;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(
- args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nig", long_options, &option_index);
+ args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nige", long_options, &option_index);
if (c == -1)
break;
switch (c) {
@@ -1640,6 +1642,9 @@ bool copy_data(command_executor *e, shell_context *sc, arguments args)
case 'g':
is_geo_data = true;
break;
+ case 'e':
+ options.return_expire_ts = false;
+ break;
default:
return false;
}
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 43c2eb8..1b05ef1 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -276,7 +276,8 @@ static command_executor commands[] = {
"[-y|--sort_key_filter_pattern str] "
"[-v|--value_filter_type anywhere|prefix|postfix|exact] "
"[-z|--value_filter_pattern str] "
- "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data]",
+ "[-n|--no_overwrite] [-i|--no_value] [-g|--geo_data] "
+ "[-e|--no_ttl]",
data_operations,
},
{
diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp
index ca8a5f1..c34cdc9 100644
--- a/src/test/function_test/test_scan.cpp
+++ b/src/test/function_test/test_scan.cpp
@@ -28,6 +28,7 @@
#include <pegasus/client.h>
#include <gtest/gtest.h>
#include "base/pegasus_const.h"
+#include "base/pegasus_utils.h"
using namespace ::pegasus;
@@ -37,6 +38,8 @@ static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOP
static char buffer[256];
static std::map<std::string, std::map<std::string, std::string>> base;
static std::string expected_hash_key;
+static constexpr int ttl_seconds = 24 * 60 * 60;
+static std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> ttl_base;
// REQUIRED: 'buffer' has been filled with random chars.
static const std::string random_string()
@@ -52,6 +55,25 @@ static const std::string random_string()
}
}
+static void
+check_and_put(std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> &data,
+ const std::string &hash_key,
+ const std::string &sort_key,
+ const std::string &value,
+ uint32_t expire_ts_seconds)
+{
+ auto it1 = data.find(hash_key);
+ if (it1 != data.end()) {
+ auto it2 = it1->second.find(sort_key);
+ ASSERT_EQ(it1->second.end(), it2)
+ << "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
+ << ", old_value=" << it2->second.first << ", new_value=" << value
+ << ", old_expire_ts_seconds=" << it2->second.second
+ << ", new_expire_ts_seconds=" << expire_ts_seconds;
+ }
+ data[hash_key][sort_key] = std::pair<std::string, uint32_t>(value, expire_ts_seconds);
+}
+
static void check_and_put(std::map<std::string, std::map<std::string, std::string>> &data,
const std::string &hash_key,
const std::string &sort_key,
@@ -78,6 +100,48 @@ static void check_and_put(std::map<std::string, std::string> &data,
data[sort_key] = value;
}
+static void compare(const std::pair<std::string, uint32_t> &data,
+ const std::pair<std::string, uint32_t> &base,
+ const std::string &hash_key,
+ const std::string sort_key)
+{
+ ASSERT_EQ(base.first, data.first)
+ << "Diff value: hash_key=" << hash_key << ", sort_key=" << sort_key
+ << ", data_value=" << data.first << ", data_expire_ts_seconds=" << data.second
+ << ", base_value=" << base.first << ", base_expire_ts_seconds=" << base.second;
+
+ ASSERT_TRUE(data.second >= base.second && data.second - base.second <= 1)
+ << "Diff expire_ts_seconds: hash_key=" << hash_key << ", sort_key=" << sort_key
+ << ", data_value=" << data.first << ", data_expire_ts_seconds=" << data.second
+ << ", base_value=" << base.first << ", base_expire_ts_seconds=" << base.second;
+}
+
+static void compare(const std::map<std::string, std::pair<std::string, uint32_t>> &data,
+ const std::map<std::string, std::pair<std::string, uint32_t>> &base,
+ const std::string &hash_key)
+{
+ for (auto it1 = data.begin(), it2 = base.begin();; ++it1, ++it2) {
+ if (it1 == data.end()) {
+ ASSERT_EQ(base.end(), it2)
+ << "Only in base: hash_key=" << hash_key << ", sort_key=" << it2->first
+ << ", value=" << it2->second.first << ", expire_ts_seconds=" << it2->second.second;
+ break;
+ }
+ ASSERT_NE(base.end(), it2) << "Only in data: hash_key=" << hash_key
+ << ", sort_key=" << it1->first << ", value=" << it1->second.first
+ << ", expire_ts_seconds=" << it1->second.second;
+ ASSERT_EQ(it2->first, it1->first)
+ << "Diff sort_key: hash_key=" << hash_key << ", data_sort_key=" << it1->first
+ << ", data_value=" << it1->second.first
+ << ", data_expire_ts_seconds=" << it1->second.second << ", base_sort_key=" << it2->first
+ << ", base_value=" << it2->second.first
+ << ", base_expire_ts_seconds=" << it2->second.second;
+ compare(it1->second, it2->second, hash_key, it1->first);
+ }
+
+ dinfo("Data and base are the same.");
+}
+
static void compare(const std::map<std::string, std::string> &data,
const std::map<std::string, std::string> &base,
const std::string &hash_key)
@@ -98,8 +162,8 @@ static void compare(const std::map<std::string, std::string> &data,
dinfo("Data and base are the same.");
}
-static void compare(std::map<std::string, std::map<std::string, std::string>> &data,
- std::map<std::string, std::map<std::string, std::string>> &base)
+template <typename T, typename U>
+static void compare(const T &data, const U &base)
{
for (auto it1 = data.begin(), it2 = base.begin();; ++it1, ++it2) {
if (it1 == data.end()) {
@@ -179,7 +243,7 @@ static void fill_database()
base[expected_hash_key][sort_key] = value;
}
- while (base.size() < 1000) {
+ while (base.size() < 500) {
hash_key = random_string();
while (base[hash_key].size() < 10) {
sort_key = random_string();
@@ -192,6 +256,22 @@ static void fill_database()
}
}
+ while (base.size() < 1000) {
+ hash_key = random_string();
+ while (base[hash_key].size() < 10) {
+ sort_key = random_string();
+ value = random_string();
+ auto expire_ts_seconds = static_cast<uint32_t>(ttl_seconds) + utils::epoch_now();
+ int ret = client->set(hash_key, sort_key, value, 5000, ttl_seconds, nullptr);
+ ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
+ << ", sort_key=" << sort_key
+ << ", error=" << client->get_error_string(ret);
+ base[hash_key][sort_key] = value;
+ ttl_base[hash_key][sort_key] =
+ std::pair<std::string, uint32_t>(value, expire_ts_seconds);
+ }
+ }
+
ddebug("Database filled.");
}
@@ -416,6 +496,58 @@ TEST_F(scan, OVERALL)
compare(data, base);
}
+TEST_F(scan, REQUEST_EXPIRE_TS)
+{
+ ddebug("TEST REQUEST_EXPIRE_TS...");
+
+ pegasus_client::scan_options options;
+ options.return_expire_ts = true;
+ std::vector<pegasus_client::pegasus_scanner *> raw_scanners;
+ int ret = client->get_unordered_scanners(3, options, raw_scanners);
+ ASSERT_EQ(pegasus::PERR_OK, ret) << "Error occurred when getting scanner. error="
+ << client->get_error_string(ret);
+
+ std::vector<pegasus::pegasus_client::pegasus_scanner_wrapper> scanners;
+ for (auto raw_scanner : raw_scanners) {
+ ASSERT_NE(nullptr, raw_scanner);
+ scanners.push_back(raw_scanner->get_smart_wrapper());
+ }
+ raw_scanners.clear();
+ ASSERT_LE(scanners.size(), 3);
+
+ std::map<std::string, std::map<std::string, std::string>> data;
+ std::map<std::string, std::map<std::string, std::pair<std::string, uint32_t>>> ttl_data;
+ for (auto scanner : scanners) {
+ std::atomic_bool split_completed(false);
+ while (!split_completed.load()) {
+ dsn::utils::notify_event op_completed;
+ scanner->async_next([&](int err,
+ std::string &&hash_key,
+ std::string &&sort_key,
+ std::string &&value,
+ pegasus::pegasus_client::internal_info &&info,
+ uint32_t expire_ts_seconds) {
+ if (err == pegasus::PERR_OK) {
+ check_and_put(data, hash_key, sort_key, value);
+ if (expire_ts_seconds > 0) {
+ check_and_put(ttl_data, hash_key, sort_key, value, expire_ts_seconds);
+ }
+ } else if (err == pegasus::PERR_SCAN_COMPLETE) {
+ split_completed.store(true);
+ } else {
+ ASSERT_TRUE(false) << "Error occurred when scan. error="
+ << client->get_error_string(err);
+ }
+ op_completed.notify();
+ });
+ op_completed.wait();
+ }
+ }
+
+ compare(data, base);
+ compare(ttl_data, ttl_base);
+}
+
TEST_F(scan, ITERATION_TIME_LIMIT)
{
// update iteration threshold to 1ms
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org