You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2022/02/16 09:28:34 UTC
[incubator-pegasus] branch master updated: feat: add 'BATCH_GET' interface for read optimization (#897)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new dc4c710 feat: add 'BATCH_GET' interface for read optimization (#897)
dc4c710 is described below
commit dc4c710edead1f2806daa74462666c8cfbb94397
Author: cauchy1988 <ta...@163.com>
AuthorDate: Wed Feb 16 17:28:26 2022 +0800
feat: add 'BATCH_GET' interface for read optimization (#897)
---
src/base/rrdb_types.cpp | 1443 ++++++++++++++++++++---------
src/idl/rrdb.thrift | 24 +
src/include/rrdb/rrdb.client.h | 26 +
src/include/rrdb/rrdb.code.definition.h | 1 +
src/include/rrdb/rrdb_types.h | 242 ++++-
src/server/brief_stat.cpp | 2 +
src/server/capacity_unit_calculator.cpp | 30 +
src/server/capacity_unit_calculator.h | 4 +
src/server/config.ini | 9 +
src/server/config.min.ini | 7 +
src/server/pegasus_read_service.h | 10 +
src/server/pegasus_server_impl.cpp | 116 +++
src/server/pegasus_server_impl.h | 24 +
src/server/pegasus_server_impl_init.cpp | 24 +
src/shell/command_helper.h | 15 +-
src/shell/commands.h | 2 +
src/shell/commands/node_management.cpp | 10 +
src/shell/commands/table_management.cpp | 3 +
src/test/function_test/run.sh | 2 +
src/test/function_test/test_batch_get.cpp | 104 +++
20 files changed, 1654 insertions(+), 444 deletions(-)
diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index 9eaa7e7..4ef46f3 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -2142,6 +2142,571 @@ void multi_get_response::printTo(std::ostream &out) const
out << ")";
}
+batch_get_request::~batch_get_request() throw() {}
+
+void batch_get_request::__set_keys(const std::vector<full_key> &val) { this->keys = val; }
+
+uint32_t batch_get_request::read(::apache::thrift::protocol::TProtocol *iprot)
+{
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ while (true) {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid) {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_LIST) {
+ {
+ this->keys.clear();
+ uint32_t _size69;
+ ::apache::thrift::protocol::TType _etype72;
+ xfer += iprot->readListBegin(_etype72, _size69);
+ this->keys.resize(_size69);
+ uint32_t _i73;
+ for (_i73 = 0; _i73 < _size69; ++_i73) {
+ xfer += this->keys[_i73].read(iprot);
+ }
+ xfer += iprot->readListEnd();
+ }
+ this->__isset.keys = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t batch_get_request::write(::apache::thrift::protocol::TProtocol *oprot) const
+{
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("batch_get_request");
+
+ xfer += oprot->writeFieldBegin("keys", ::apache::thrift::protocol::T_LIST, 1);
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT,
+ static_cast<uint32_t>(this->keys.size()));
+ std::vector<full_key>::const_iterator _iter74;
+ for (_iter74 = this->keys.begin(); _iter74 != this->keys.end(); ++_iter74) {
+ xfer += (*_iter74).write(oprot);
+ }
+ xfer += oprot->writeListEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(batch_get_request &a, batch_get_request &b)
+{
+ using ::std::swap;
+ swap(a.keys, b.keys);
+ swap(a.__isset, b.__isset);
+}
+
+batch_get_request::batch_get_request(const batch_get_request &other75)
+{
+ keys = other75.keys;
+ __isset = other75.__isset;
+}
+batch_get_request::batch_get_request(batch_get_request &&other76)
+{
+ keys = std::move(other76.keys);
+ __isset = std::move(other76.__isset);
+}
+batch_get_request &batch_get_request::operator=(const batch_get_request &other77)
+{
+ keys = other77.keys;
+ __isset = other77.__isset;
+ return *this;
+}
+batch_get_request &batch_get_request::operator=(batch_get_request &&other78)
+{
+ keys = std::move(other78.keys);
+ __isset = std::move(other78.__isset);
+ return *this;
+}
+void batch_get_request::printTo(std::ostream &out) const
+{
+ using ::apache::thrift::to_string;
+ out << "batch_get_request(";
+ out << "keys=" << to_string(keys);
+ out << ")";
+}
+
+full_key::~full_key() throw() {}
+
+void full_key::__set_hash_key(const ::dsn::blob &val) { this->hash_key = val; }
+
+void full_key::__set_sort_key(const ::dsn::blob &val) { this->sort_key = val; }
+
+uint32_t full_key::read(::apache::thrift::protocol::TProtocol *iprot)
+{
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ while (true) {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid) {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+ xfer += this->hash_key.read(iprot);
+ this->__isset.hash_key = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+ xfer += this->sort_key.read(iprot);
+ this->__isset.sort_key = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t full_key::write(::apache::thrift::protocol::TProtocol *oprot) const
+{
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("full_key");
+
+ xfer += oprot->writeFieldBegin("hash_key", ::apache::thrift::protocol::T_STRUCT, 1);
+ xfer += this->hash_key.write(oprot);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("sort_key", ::apache::thrift::protocol::T_STRUCT, 2);
+ xfer += this->sort_key.write(oprot);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(full_key &a, full_key &b)
+{
+ using ::std::swap;
+ swap(a.hash_key, b.hash_key);
+ swap(a.sort_key, b.sort_key);
+ swap(a.__isset, b.__isset);
+}
+
+full_key::full_key(const full_key &other79)
+{
+ hash_key = other79.hash_key;
+ sort_key = other79.sort_key;
+ __isset = other79.__isset;
+}
+full_key::full_key(full_key &&other80)
+{
+ hash_key = std::move(other80.hash_key);
+ sort_key = std::move(other80.sort_key);
+ __isset = std::move(other80.__isset);
+}
+full_key &full_key::operator=(const full_key &other81)
+{
+ hash_key = other81.hash_key;
+ sort_key = other81.sort_key;
+ __isset = other81.__isset;
+ return *this;
+}
+full_key &full_key::operator=(full_key &&other82)
+{
+ hash_key = std::move(other82.hash_key);
+ sort_key = std::move(other82.sort_key);
+ __isset = std::move(other82.__isset);
+ return *this;
+}
+void full_key::printTo(std::ostream &out) const
+{
+ using ::apache::thrift::to_string;
+ out << "full_key(";
+ out << "hash_key=" << to_string(hash_key);
+ out << ", "
+ << "sort_key=" << to_string(sort_key);
+ out << ")";
+}
+
+batch_get_response::~batch_get_response() throw() {}
+
+void batch_get_response::__set_error(const int32_t val) { this->error = val; }
+
+void batch_get_response::__set_data(const std::vector<full_data> &val) { this->data = val; }
+
+void batch_get_response::__set_app_id(const int32_t val) { this->app_id = val; }
+
+void batch_get_response::__set_partition_index(const int32_t val) { this->partition_index = val; }
+
+void batch_get_response::__set_server(const std::string &val) { this->server = val; }
+
+uint32_t batch_get_response::read(::apache::thrift::protocol::TProtocol *iprot)
+{
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ while (true) {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid) {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ xfer += iprot->readI32(this->error);
+ this->__isset.error = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_LIST) {
+ {
+ this->data.clear();
+ uint32_t _size83;
+ ::apache::thrift::protocol::TType _etype86;
+ xfer += iprot->readListBegin(_etype86, _size83);
+ this->data.resize(_size83);
+ uint32_t _i87;
+ for (_i87 = 0; _i87 < _size83; ++_i87) {
+ xfer += this->data[_i87].read(iprot);
+ }
+ xfer += iprot->readListEnd();
+ }
+ this->__isset.data = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ xfer += iprot->readI32(this->app_id);
+ this->__isset.app_id = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 4:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ xfer += iprot->readI32(this->partition_index);
+ this->__isset.partition_index = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 6:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->server);
+ this->__isset.server = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t batch_get_response::write(::apache::thrift::protocol::TProtocol *oprot) const
+{
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("batch_get_response");
+
+ xfer += oprot->writeFieldBegin("error", ::apache::thrift::protocol::T_I32, 1);
+ xfer += oprot->writeI32(this->error);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("data", ::apache::thrift::protocol::T_LIST, 2);
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT,
+ static_cast<uint32_t>(this->data.size()));
+ std::vector<full_data>::const_iterator _iter88;
+ for (_iter88 = this->data.begin(); _iter88 != this->data.end(); ++_iter88) {
+ xfer += (*_iter88).write(oprot);
+ }
+ xfer += oprot->writeListEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("app_id", ::apache::thrift::protocol::T_I32, 3);
+ xfer += oprot->writeI32(this->app_id);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("partition_index", ::apache::thrift::protocol::T_I32, 4);
+ xfer += oprot->writeI32(this->partition_index);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("server", ::apache::thrift::protocol::T_STRING, 6);
+ xfer += oprot->writeString(this->server);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(batch_get_response &a, batch_get_response &b)
+{
+ using ::std::swap;
+ swap(a.error, b.error);
+ swap(a.data, b.data);
+ swap(a.app_id, b.app_id);
+ swap(a.partition_index, b.partition_index);
+ swap(a.server, b.server);
+ swap(a.__isset, b.__isset);
+}
+
+batch_get_response::batch_get_response(const batch_get_response &other89)
+{
+ error = other89.error;
+ data = other89.data;
+ app_id = other89.app_id;
+ partition_index = other89.partition_index;
+ server = other89.server;
+ __isset = other89.__isset;
+}
+batch_get_response::batch_get_response(batch_get_response &&other90)
+{
+ error = std::move(other90.error);
+ data = std::move(other90.data);
+ app_id = std::move(other90.app_id);
+ partition_index = std::move(other90.partition_index);
+ server = std::move(other90.server);
+ __isset = std::move(other90.__isset);
+}
+batch_get_response &batch_get_response::operator=(const batch_get_response &other91)
+{
+ error = other91.error;
+ data = other91.data;
+ app_id = other91.app_id;
+ partition_index = other91.partition_index;
+ server = other91.server;
+ __isset = other91.__isset;
+ return *this;
+}
+batch_get_response &batch_get_response::operator=(batch_get_response &&other92)
+{
+ error = std::move(other92.error);
+ data = std::move(other92.data);
+ app_id = std::move(other92.app_id);
+ partition_index = std::move(other92.partition_index);
+ server = std::move(other92.server);
+ __isset = std::move(other92.__isset);
+ return *this;
+}
+void batch_get_response::printTo(std::ostream &out) const
+{
+ using ::apache::thrift::to_string;
+ out << "batch_get_response(";
+ out << "error=" << to_string(error);
+ out << ", "
+ << "data=" << to_string(data);
+ out << ", "
+ << "app_id=" << to_string(app_id);
+ out << ", "
+ << "partition_index=" << to_string(partition_index);
+ out << ", "
+ << "server=" << to_string(server);
+ out << ")";
+}
+
+full_data::~full_data() throw() {}
+
+void full_data::__set_hash_key(const ::dsn::blob &val) { this->hash_key = val; }
+
+void full_data::__set_sort_key(const ::dsn::blob &val) { this->sort_key = val; }
+
+void full_data::__set_value(const ::dsn::blob &val) { this->value = val; }
+
+uint32_t full_data::read(::apache::thrift::protocol::TProtocol *iprot)
+{
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ while (true) {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid) {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+ xfer += this->hash_key.read(iprot);
+ this->__isset.hash_key = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+ xfer += this->sort_key.read(iprot);
+ this->__isset.sort_key = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+ xfer += this->value.read(iprot);
+ this->__isset.value = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t full_data::write(::apache::thrift::protocol::TProtocol *oprot) const
+{
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("full_data");
+
+ xfer += oprot->writeFieldBegin("hash_key", ::apache::thrift::protocol::T_STRUCT, 1);
+ xfer += this->hash_key.write(oprot);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("sort_key", ::apache::thrift::protocol::T_STRUCT, 2);
+ xfer += this->sort_key.write(oprot);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("value", ::apache::thrift::protocol::T_STRUCT, 3);
+ xfer += this->value.write(oprot);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(full_data &a, full_data &b)
+{
+ using ::std::swap;
+ swap(a.hash_key, b.hash_key);
+ swap(a.sort_key, b.sort_key);
+ swap(a.value, b.value);
+ swap(a.__isset, b.__isset);
+}
+
+full_data::full_data(const full_data &other93)
+{
+ hash_key = other93.hash_key;
+ sort_key = other93.sort_key;
+ value = other93.value;
+ __isset = other93.__isset;
+}
+full_data::full_data(full_data &&other94)
+{
+ hash_key = std::move(other94.hash_key);
+ sort_key = std::move(other94.sort_key);
+ value = std::move(other94.value);
+ __isset = std::move(other94.__isset);
+}
+full_data &full_data::operator=(const full_data &other95)
+{
+ hash_key = other95.hash_key;
+ sort_key = other95.sort_key;
+ value = other95.value;
+ __isset = other95.__isset;
+ return *this;
+}
+full_data &full_data::operator=(full_data &&other96)
+{
+ hash_key = std::move(other96.hash_key);
+ sort_key = std::move(other96.sort_key);
+ value = std::move(other96.value);
+ __isset = std::move(other96.__isset);
+ return *this;
+}
+void full_data::printTo(std::ostream &out) const
+{
+ using ::apache::thrift::to_string;
+ out << "full_data(";
+ out << "hash_key=" << to_string(hash_key);
+ out << ", "
+ << "sort_key=" << to_string(sort_key);
+ out << ", "
+ << "value=" << to_string(value);
+ out << ")";
+}
+
incr_request::~incr_request() throw() {}
void incr_request::__set_key(const ::dsn::blob &val) { this->key = val; }
@@ -2237,34 +2802,34 @@ void swap(incr_request &a, incr_request &b)
swap(a.__isset, b.__isset);
}
-incr_request::incr_request(const incr_request &other69)
+incr_request::incr_request(const incr_request &other97)
{
- key = other69.key;
- increment = other69.increment;
- expire_ts_seconds = other69.expire_ts_seconds;
- __isset = other69.__isset;
+ key = other97.key;
+ increment = other97.increment;
+ expire_ts_seconds = other97.expire_ts_seconds;
+ __isset = other97.__isset;
}
-incr_request::incr_request(incr_request &&other70)
+incr_request::incr_request(incr_request &&other98)
{
- key = std::move(other70.key);
- increment = std::move(other70.increment);
- expire_ts_seconds = std::move(other70.expire_ts_seconds);
- __isset = std::move(other70.__isset);
+ key = std::move(other98.key);
+ increment = std::move(other98.increment);
+ expire_ts_seconds = std::move(other98.expire_ts_seconds);
+ __isset = std::move(other98.__isset);
}
-incr_request &incr_request::operator=(const incr_request &other71)
+incr_request &incr_request::operator=(const incr_request &other99)
{
- key = other71.key;
- increment = other71.increment;
- expire_ts_seconds = other71.expire_ts_seconds;
- __isset = other71.__isset;
+ key = other99.key;
+ increment = other99.increment;
+ expire_ts_seconds = other99.expire_ts_seconds;
+ __isset = other99.__isset;
return *this;
}
-incr_request &incr_request::operator=(incr_request &&other72)
+incr_request &incr_request::operator=(incr_request &&other100)
{
- key = std::move(other72.key);
- increment = std::move(other72.increment);
- expire_ts_seconds = std::move(other72.expire_ts_seconds);
- __isset = std::move(other72.__isset);
+ key = std::move(other100.key);
+ increment = std::move(other100.increment);
+ expire_ts_seconds = std::move(other100.expire_ts_seconds);
+ __isset = std::move(other100.__isset);
return *this;
}
void incr_request::printTo(std::ostream &out) const
@@ -2419,46 +2984,46 @@ void swap(incr_response &a, incr_response &b)
swap(a.__isset, b.__isset);
}
-incr_response::incr_response(const incr_response &other73)
+incr_response::incr_response(const incr_response &other101)
{
- error = other73.error;
- new_value = other73.new_value;
- app_id = other73.app_id;
- partition_index = other73.partition_index;
- decree = other73.decree;
- server = other73.server;
- __isset = other73.__isset;
+ error = other101.error;
+ new_value = other101.new_value;
+ app_id = other101.app_id;
+ partition_index = other101.partition_index;
+ decree = other101.decree;
+ server = other101.server;
+ __isset = other101.__isset;
}
-incr_response::incr_response(incr_response &&other74)
+incr_response::incr_response(incr_response &&other102)
{
- error = std::move(other74.error);
- new_value = std::move(other74.new_value);
- app_id = std::move(other74.app_id);
- partition_index = std::move(other74.partition_index);
- decree = std::move(other74.decree);
- server = std::move(other74.server);
- __isset = std::move(other74.__isset);
+ error = std::move(other102.error);
+ new_value = std::move(other102.new_value);
+ app_id = std::move(other102.app_id);
+ partition_index = std::move(other102.partition_index);
+ decree = std::move(other102.decree);
+ server = std::move(other102.server);
+ __isset = std::move(other102.__isset);
}
-incr_response &incr_response::operator=(const incr_response &other75)
+incr_response &incr_response::operator=(const incr_response &other103)
{
- error = other75.error;
- new_value = other75.new_value;
- app_id = other75.app_id;
- partition_index = other75.partition_index;
- decree = other75.decree;
- server = other75.server;
- __isset = other75.__isset;
+ error = other103.error;
+ new_value = other103.new_value;
+ app_id = other103.app_id;
+ partition_index = other103.partition_index;
+ decree = other103.decree;
+ server = other103.server;
+ __isset = other103.__isset;
return *this;
}
-incr_response &incr_response::operator=(incr_response &&other76)
+incr_response &incr_response::operator=(incr_response &&other104)
{
- error = std::move(other76.error);
- new_value = std::move(other76.new_value);
- app_id = std::move(other76.app_id);
- partition_index = std::move(other76.partition_index);
- decree = std::move(other76.decree);
- server = std::move(other76.server);
- __isset = std::move(other76.__isset);
+ error = std::move(other104.error);
+ new_value = std::move(other104.new_value);
+ app_id = std::move(other104.app_id);
+ partition_index = std::move(other104.partition_index);
+ decree = std::move(other104.decree);
+ server = std::move(other104.server);
+ __isset = std::move(other104.__isset);
return *this;
}
void incr_response::printTo(std::ostream &out) const
@@ -2554,9 +3119,9 @@ uint32_t check_and_set_request::read(::apache::thrift::protocol::TProtocol *ipro
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast77;
- xfer += iprot->readI32(ecast77);
- this->check_type = (cas_check_type::type)ecast77;
+ int32_t ecast105;
+ xfer += iprot->readI32(ecast105);
+ this->check_type = (cas_check_type::type)ecast105;
this->__isset.check_type = true;
} else {
xfer += iprot->skip(ftype);
@@ -2684,58 +3249,58 @@ void swap(check_and_set_request &a, check_and_set_request &b)
swap(a.__isset, b.__isset);
}
-check_and_set_request::check_and_set_request(const check_and_set_request &other78)
-{
- hash_key = other78.hash_key;
- check_sort_key = other78.check_sort_key;
- check_type = other78.check_type;
- check_operand = other78.check_operand;
- set_diff_sort_key = other78.set_diff_sort_key;
- set_sort_key = other78.set_sort_key;
- set_value = other78.set_value;
- set_expire_ts_seconds = other78.set_expire_ts_seconds;
- return_check_value = other78.return_check_value;
- __isset = other78.__isset;
-}
-check_and_set_request::check_and_set_request(check_and_set_request &&other79)
-{
- hash_key = std::move(other79.hash_key);
- check_sort_key = std::move(other79.check_sort_key);
- check_type = std::move(other79.check_type);
- check_operand = std::move(other79.check_operand);
- set_diff_sort_key = std::move(other79.set_diff_sort_key);
- set_sort_key = std::move(other79.set_sort_key);
- set_value = std::move(other79.set_value);
- set_expire_ts_seconds = std::move(other79.set_expire_ts_seconds);
- return_check_value = std::move(other79.return_check_value);
- __isset = std::move(other79.__isset);
-}
-check_and_set_request &check_and_set_request::operator=(const check_and_set_request &other80)
-{
- hash_key = other80.hash_key;
- check_sort_key = other80.check_sort_key;
- check_type = other80.check_type;
- check_operand = other80.check_operand;
- set_diff_sort_key = other80.set_diff_sort_key;
- set_sort_key = other80.set_sort_key;
- set_value = other80.set_value;
- set_expire_ts_seconds = other80.set_expire_ts_seconds;
- return_check_value = other80.return_check_value;
- __isset = other80.__isset;
+check_and_set_request::check_and_set_request(const check_and_set_request &other106)
+{
+ hash_key = other106.hash_key;
+ check_sort_key = other106.check_sort_key;
+ check_type = other106.check_type;
+ check_operand = other106.check_operand;
+ set_diff_sort_key = other106.set_diff_sort_key;
+ set_sort_key = other106.set_sort_key;
+ set_value = other106.set_value;
+ set_expire_ts_seconds = other106.set_expire_ts_seconds;
+ return_check_value = other106.return_check_value;
+ __isset = other106.__isset;
+}
+check_and_set_request::check_and_set_request(check_and_set_request &&other107)
+{
+ hash_key = std::move(other107.hash_key);
+ check_sort_key = std::move(other107.check_sort_key);
+ check_type = std::move(other107.check_type);
+ check_operand = std::move(other107.check_operand);
+ set_diff_sort_key = std::move(other107.set_diff_sort_key);
+ set_sort_key = std::move(other107.set_sort_key);
+ set_value = std::move(other107.set_value);
+ set_expire_ts_seconds = std::move(other107.set_expire_ts_seconds);
+ return_check_value = std::move(other107.return_check_value);
+ __isset = std::move(other107.__isset);
+}
+check_and_set_request &check_and_set_request::operator=(const check_and_set_request &other108)
+{
+ hash_key = other108.hash_key;
+ check_sort_key = other108.check_sort_key;
+ check_type = other108.check_type;
+ check_operand = other108.check_operand;
+ set_diff_sort_key = other108.set_diff_sort_key;
+ set_sort_key = other108.set_sort_key;
+ set_value = other108.set_value;
+ set_expire_ts_seconds = other108.set_expire_ts_seconds;
+ return_check_value = other108.return_check_value;
+ __isset = other108.__isset;
return *this;
}
-check_and_set_request &check_and_set_request::operator=(check_and_set_request &&other81)
-{
- hash_key = std::move(other81.hash_key);
- check_sort_key = std::move(other81.check_sort_key);
- check_type = std::move(other81.check_type);
- check_operand = std::move(other81.check_operand);
- set_diff_sort_key = std::move(other81.set_diff_sort_key);
- set_sort_key = std::move(other81.set_sort_key);
- set_value = std::move(other81.set_value);
- set_expire_ts_seconds = std::move(other81.set_expire_ts_seconds);
- return_check_value = std::move(other81.return_check_value);
- __isset = std::move(other81.__isset);
+check_and_set_request &check_and_set_request::operator=(check_and_set_request &&other109)
+{
+ hash_key = std::move(other109.hash_key);
+ check_sort_key = std::move(other109.check_sort_key);
+ check_type = std::move(other109.check_type);
+ check_operand = std::move(other109.check_operand);
+ set_diff_sort_key = std::move(other109.set_diff_sort_key);
+ set_sort_key = std::move(other109.set_sort_key);
+ set_value = std::move(other109.set_value);
+ set_expire_ts_seconds = std::move(other109.set_expire_ts_seconds);
+ return_check_value = std::move(other109.return_check_value);
+ __isset = std::move(other109.__isset);
return *this;
}
void check_and_set_request::printTo(std::ostream &out) const
@@ -2941,54 +3506,54 @@ void swap(check_and_set_response &a, check_and_set_response &b)
swap(a.__isset, b.__isset);
}
-check_and_set_response::check_and_set_response(const check_and_set_response &other82)
-{
- error = other82.error;
- check_value_returned = other82.check_value_returned;
- check_value_exist = other82.check_value_exist;
- check_value = other82.check_value;
- app_id = other82.app_id;
- partition_index = other82.partition_index;
- decree = other82.decree;
- server = other82.server;
- __isset = other82.__isset;
-}
-check_and_set_response::check_and_set_response(check_and_set_response &&other83)
-{
- error = std::move(other83.error);
- check_value_returned = std::move(other83.check_value_returned);
- check_value_exist = std::move(other83.check_value_exist);
- check_value = std::move(other83.check_value);
- app_id = std::move(other83.app_id);
- partition_index = std::move(other83.partition_index);
- decree = std::move(other83.decree);
- server = std::move(other83.server);
- __isset = std::move(other83.__isset);
-}
-check_and_set_response &check_and_set_response::operator=(const check_and_set_response &other84)
-{
- error = other84.error;
- check_value_returned = other84.check_value_returned;
- check_value_exist = other84.check_value_exist;
- check_value = other84.check_value;
- app_id = other84.app_id;
- partition_index = other84.partition_index;
- decree = other84.decree;
- server = other84.server;
- __isset = other84.__isset;
+check_and_set_response::check_and_set_response(const check_and_set_response &other110)
+{
+ error = other110.error;
+ check_value_returned = other110.check_value_returned;
+ check_value_exist = other110.check_value_exist;
+ check_value = other110.check_value;
+ app_id = other110.app_id;
+ partition_index = other110.partition_index;
+ decree = other110.decree;
+ server = other110.server;
+ __isset = other110.__isset;
+}
+check_and_set_response::check_and_set_response(check_and_set_response &&other111)
+{
+ error = std::move(other111.error);
+ check_value_returned = std::move(other111.check_value_returned);
+ check_value_exist = std::move(other111.check_value_exist);
+ check_value = std::move(other111.check_value);
+ app_id = std::move(other111.app_id);
+ partition_index = std::move(other111.partition_index);
+ decree = std::move(other111.decree);
+ server = std::move(other111.server);
+ __isset = std::move(other111.__isset);
+}
+check_and_set_response &check_and_set_response::operator=(const check_and_set_response &other112)
+{
+ error = other112.error;
+ check_value_returned = other112.check_value_returned;
+ check_value_exist = other112.check_value_exist;
+ check_value = other112.check_value;
+ app_id = other112.app_id;
+ partition_index = other112.partition_index;
+ decree = other112.decree;
+ server = other112.server;
+ __isset = other112.__isset;
return *this;
}
-check_and_set_response &check_and_set_response::operator=(check_and_set_response &&other85)
-{
- error = std::move(other85.error);
- check_value_returned = std::move(other85.check_value_returned);
- check_value_exist = std::move(other85.check_value_exist);
- check_value = std::move(other85.check_value);
- app_id = std::move(other85.app_id);
- partition_index = std::move(other85.partition_index);
- decree = std::move(other85.decree);
- server = std::move(other85.server);
- __isset = std::move(other85.__isset);
+check_and_set_response &check_and_set_response::operator=(check_and_set_response &&other113)
+{
+ error = std::move(other113.error);
+ check_value_returned = std::move(other113.check_value_returned);
+ check_value_exist = std::move(other113.check_value_exist);
+ check_value = std::move(other113.check_value);
+ app_id = std::move(other113.app_id);
+ partition_index = std::move(other113.partition_index);
+ decree = std::move(other113.decree);
+ server = std::move(other113.server);
+ __isset = std::move(other113.__isset);
return *this;
}
void check_and_set_response::printTo(std::ostream &out) const
@@ -3044,9 +3609,9 @@ uint32_t mutate::read(::apache::thrift::protocol::TProtocol *iprot)
switch (fid) {
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast86;
- xfer += iprot->readI32(ecast86);
- this->operation = (mutate_operation::type)ecast86;
+ int32_t ecast114;
+ xfer += iprot->readI32(ecast114);
+ this->operation = (mutate_operation::type)ecast114;
this->__isset.operation = true;
} else {
xfer += iprot->skip(ftype);
@@ -3125,38 +3690,38 @@ void swap(mutate &a, mutate &b)
swap(a.__isset, b.__isset);
}
-mutate::mutate(const mutate &other87)
+mutate::mutate(const mutate &other115)
{
- operation = other87.operation;
- sort_key = other87.sort_key;
- value = other87.value;
- set_expire_ts_seconds = other87.set_expire_ts_seconds;
- __isset = other87.__isset;
+ operation = other115.operation;
+ sort_key = other115.sort_key;
+ value = other115.value;
+ set_expire_ts_seconds = other115.set_expire_ts_seconds;
+ __isset = other115.__isset;
}
-mutate::mutate(mutate &&other88)
+mutate::mutate(mutate &&other116)
{
- operation = std::move(other88.operation);
- sort_key = std::move(other88.sort_key);
- value = std::move(other88.value);
- set_expire_ts_seconds = std::move(other88.set_expire_ts_seconds);
- __isset = std::move(other88.__isset);
+ operation = std::move(other116.operation);
+ sort_key = std::move(other116.sort_key);
+ value = std::move(other116.value);
+ set_expire_ts_seconds = std::move(other116.set_expire_ts_seconds);
+ __isset = std::move(other116.__isset);
}
-mutate &mutate::operator=(const mutate &other89)
+mutate &mutate::operator=(const mutate &other117)
{
- operation = other89.operation;
- sort_key = other89.sort_key;
- value = other89.value;
- set_expire_ts_seconds = other89.set_expire_ts_seconds;
- __isset = other89.__isset;
+ operation = other117.operation;
+ sort_key = other117.sort_key;
+ value = other117.value;
+ set_expire_ts_seconds = other117.set_expire_ts_seconds;
+ __isset = other117.__isset;
return *this;
}
-mutate &mutate::operator=(mutate &&other90)
+mutate &mutate::operator=(mutate &&other118)
{
- operation = std::move(other90.operation);
- sort_key = std::move(other90.sort_key);
- value = std::move(other90.value);
- set_expire_ts_seconds = std::move(other90.set_expire_ts_seconds);
- __isset = std::move(other90.__isset);
+ operation = std::move(other118.operation);
+ sort_key = std::move(other118.sort_key);
+ value = std::move(other118.value);
+ set_expire_ts_seconds = std::move(other118.set_expire_ts_seconds);
+ __isset = std::move(other118.__isset);
return *this;
}
void mutate::printTo(std::ostream &out) const
@@ -3239,9 +3804,9 @@ uint32_t check_and_mutate_request::read(::apache::thrift::protocol::TProtocol *i
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast91;
- xfer += iprot->readI32(ecast91);
- this->check_type = (cas_check_type::type)ecast91;
+ int32_t ecast119;
+ xfer += iprot->readI32(ecast119);
+ this->check_type = (cas_check_type::type)ecast119;
this->__isset.check_type = true;
} else {
xfer += iprot->skip(ftype);
@@ -3259,13 +3824,13 @@ uint32_t check_and_mutate_request::read(::apache::thrift::protocol::TProtocol *i
if (ftype == ::apache::thrift::protocol::T_LIST) {
{
this->mutate_list.clear();
- uint32_t _size92;
- ::apache::thrift::protocol::TType _etype95;
- xfer += iprot->readListBegin(_etype95, _size92);
- this->mutate_list.resize(_size92);
- uint32_t _i96;
- for (_i96 = 0; _i96 < _size92; ++_i96) {
- xfer += this->mutate_list[_i96].read(iprot);
+ uint32_t _size120;
+ ::apache::thrift::protocol::TType _etype123;
+ xfer += iprot->readListBegin(_etype123, _size120);
+ this->mutate_list.resize(_size120);
+ uint32_t _i124;
+ for (_i124 = 0; _i124 < _size120; ++_i124) {
+ xfer += this->mutate_list[_i124].read(iprot);
}
xfer += iprot->readListEnd();
}
@@ -3320,9 +3885,10 @@ uint32_t check_and_mutate_request::write(::apache::thrift::protocol::TProtocol *
{
xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT,
static_cast<uint32_t>(this->mutate_list.size()));
- std::vector<mutate>::const_iterator _iter97;
- for (_iter97 = this->mutate_list.begin(); _iter97 != this->mutate_list.end(); ++_iter97) {
- xfer += (*_iter97).write(oprot);
+ std::vector<mutate>::const_iterator _iter125;
+ for (_iter125 = this->mutate_list.begin(); _iter125 != this->mutate_list.end();
+ ++_iter125) {
+ xfer += (*_iter125).write(oprot);
}
xfer += oprot->writeListEnd();
}
@@ -3349,47 +3915,47 @@ void swap(check_and_mutate_request &a, check_and_mutate_request &b)
swap(a.__isset, b.__isset);
}
-check_and_mutate_request::check_and_mutate_request(const check_and_mutate_request &other98)
+check_and_mutate_request::check_and_mutate_request(const check_and_mutate_request &other126)
{
- hash_key = other98.hash_key;
- check_sort_key = other98.check_sort_key;
- check_type = other98.check_type;
- check_operand = other98.check_operand;
- mutate_list = other98.mutate_list;
- return_check_value = other98.return_check_value;
- __isset = other98.__isset;
+ hash_key = other126.hash_key;
+ check_sort_key = other126.check_sort_key;
+ check_type = other126.check_type;
+ check_operand = other126.check_operand;
+ mutate_list = other126.mutate_list;
+ return_check_value = other126.return_check_value;
+ __isset = other126.__isset;
}
-check_and_mutate_request::check_and_mutate_request(check_and_mutate_request &&other99)
+check_and_mutate_request::check_and_mutate_request(check_and_mutate_request &&other127)
{
- hash_key = std::move(other99.hash_key);
- check_sort_key = std::move(other99.check_sort_key);
- check_type = std::move(other99.check_type);
- check_operand = std::move(other99.check_operand);
- mutate_list = std::move(other99.mutate_list);
- return_check_value = std::move(other99.return_check_value);
- __isset = std::move(other99.__isset);
+ hash_key = std::move(other127.hash_key);
+ check_sort_key = std::move(other127.check_sort_key);
+ check_type = std::move(other127.check_type);
+ check_operand = std::move(other127.check_operand);
+ mutate_list = std::move(other127.mutate_list);
+ return_check_value = std::move(other127.return_check_value);
+ __isset = std::move(other127.__isset);
}
check_and_mutate_request &check_and_mutate_request::
-operator=(const check_and_mutate_request &other100)
-{
- hash_key = other100.hash_key;
- check_sort_key = other100.check_sort_key;
- check_type = other100.check_type;
- check_operand = other100.check_operand;
- mutate_list = other100.mutate_list;
- return_check_value = other100.return_check_value;
- __isset = other100.__isset;
+operator=(const check_and_mutate_request &other128)
+{
+ hash_key = other128.hash_key;
+ check_sort_key = other128.check_sort_key;
+ check_type = other128.check_type;
+ check_operand = other128.check_operand;
+ mutate_list = other128.mutate_list;
+ return_check_value = other128.return_check_value;
+ __isset = other128.__isset;
return *this;
}
-check_and_mutate_request &check_and_mutate_request::operator=(check_and_mutate_request &&other101)
+check_and_mutate_request &check_and_mutate_request::operator=(check_and_mutate_request &&other129)
{
- hash_key = std::move(other101.hash_key);
- check_sort_key = std::move(other101.check_sort_key);
- check_type = std::move(other101.check_type);
- check_operand = std::move(other101.check_operand);
- mutate_list = std::move(other101.mutate_list);
- return_check_value = std::move(other101.return_check_value);
- __isset = std::move(other101.__isset);
+ hash_key = std::move(other129.hash_key);
+ check_sort_key = std::move(other129.check_sort_key);
+ check_type = std::move(other129.check_type);
+ check_operand = std::move(other129.check_operand);
+ mutate_list = std::move(other129.mutate_list);
+ return_check_value = std::move(other129.return_check_value);
+ __isset = std::move(other129.__isset);
return *this;
}
void check_and_mutate_request::printTo(std::ostream &out) const
@@ -3592,56 +4158,56 @@ void swap(check_and_mutate_response &a, check_and_mutate_response &b)
swap(a.__isset, b.__isset);
}
-check_and_mutate_response::check_and_mutate_response(const check_and_mutate_response &other102)
+check_and_mutate_response::check_and_mutate_response(const check_and_mutate_response &other130)
{
- error = other102.error;
- check_value_returned = other102.check_value_returned;
- check_value_exist = other102.check_value_exist;
- check_value = other102.check_value;
- app_id = other102.app_id;
- partition_index = other102.partition_index;
- decree = other102.decree;
- server = other102.server;
- __isset = other102.__isset;
+ error = other130.error;
+ check_value_returned = other130.check_value_returned;
+ check_value_exist = other130.check_value_exist;
+ check_value = other130.check_value;
+ app_id = other130.app_id;
+ partition_index = other130.partition_index;
+ decree = other130.decree;
+ server = other130.server;
+ __isset = other130.__isset;
}
-check_and_mutate_response::check_and_mutate_response(check_and_mutate_response &&other103)
+check_and_mutate_response::check_and_mutate_response(check_and_mutate_response &&other131)
{
- error = std::move(other103.error);
- check_value_returned = std::move(other103.check_value_returned);
- check_value_exist = std::move(other103.check_value_exist);
- check_value = std::move(other103.check_value);
- app_id = std::move(other103.app_id);
- partition_index = std::move(other103.partition_index);
- decree = std::move(other103.decree);
- server = std::move(other103.server);
- __isset = std::move(other103.__isset);
+ error = std::move(other131.error);
+ check_value_returned = std::move(other131.check_value_returned);
+ check_value_exist = std::move(other131.check_value_exist);
+ check_value = std::move(other131.check_value);
+ app_id = std::move(other131.app_id);
+ partition_index = std::move(other131.partition_index);
+ decree = std::move(other131.decree);
+ server = std::move(other131.server);
+ __isset = std::move(other131.__isset);
}
check_and_mutate_response &check_and_mutate_response::
-operator=(const check_and_mutate_response &other104)
-{
- error = other104.error;
- check_value_returned = other104.check_value_returned;
- check_value_exist = other104.check_value_exist;
- check_value = other104.check_value;
- app_id = other104.app_id;
- partition_index = other104.partition_index;
- decree = other104.decree;
- server = other104.server;
- __isset = other104.__isset;
+operator=(const check_and_mutate_response &other132)
+{
+ error = other132.error;
+ check_value_returned = other132.check_value_returned;
+ check_value_exist = other132.check_value_exist;
+ check_value = other132.check_value;
+ app_id = other132.app_id;
+ partition_index = other132.partition_index;
+ decree = other132.decree;
+ server = other132.server;
+ __isset = other132.__isset;
return *this;
}
check_and_mutate_response &check_and_mutate_response::
-operator=(check_and_mutate_response &&other105)
-{
- error = std::move(other105.error);
- check_value_returned = std::move(other105.check_value_returned);
- check_value_exist = std::move(other105.check_value_exist);
- check_value = std::move(other105.check_value);
- app_id = std::move(other105.app_id);
- partition_index = std::move(other105.partition_index);
- decree = std::move(other105.decree);
- server = std::move(other105.server);
- __isset = std::move(other105.__isset);
+operator=(check_and_mutate_response &&other133)
+{
+ error = std::move(other133.error);
+ check_value_returned = std::move(other133.check_value_returned);
+ check_value_exist = std::move(other133.check_value_exist);
+ check_value = std::move(other133.check_value);
+ app_id = std::move(other133.app_id);
+ partition_index = std::move(other133.partition_index);
+ decree = std::move(other133.decree);
+ server = std::move(other133.server);
+ __isset = std::move(other133.__isset);
return *this;
}
void check_and_mutate_response::printTo(std::ostream &out) const
@@ -3787,9 +4353,9 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
break;
case 7:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast106;
- xfer += iprot->readI32(ecast106);
- this->hash_key_filter_type = (filter_type::type)ecast106;
+ int32_t ecast134;
+ xfer += iprot->readI32(ecast134);
+ this->hash_key_filter_type = (filter_type::type)ecast134;
this->__isset.hash_key_filter_type = true;
} else {
xfer += iprot->skip(ftype);
@@ -3805,9 +4371,9 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot)
break;
case 9:
if (ftype == ::apache::thrift::protocol::T_I32) {
- int32_t ecast107;
- xfer += iprot->readI32(ecast107);
- this->sort_key_filter_type = (filter_type::type)ecast107;
+ int32_t ecast135;
+ xfer += iprot->readI32(ecast135);
+ this->sort_key_filter_type = (filter_type::type)ecast135;
this->__isset.sort_key_filter_type = true;
} else {
xfer += iprot->skip(ftype);
@@ -3945,74 +4511,74 @@ void swap(get_scanner_request &a, get_scanner_request &b)
swap(a.__isset, b.__isset);
}
-get_scanner_request::get_scanner_request(const get_scanner_request &other108)
-{
- start_key = other108.start_key;
- stop_key = other108.stop_key;
- start_inclusive = other108.start_inclusive;
- stop_inclusive = other108.stop_inclusive;
- batch_size = other108.batch_size;
- no_value = other108.no_value;
- hash_key_filter_type = other108.hash_key_filter_type;
- hash_key_filter_pattern = other108.hash_key_filter_pattern;
- sort_key_filter_type = other108.sort_key_filter_type;
- sort_key_filter_pattern = other108.sort_key_filter_pattern;
- validate_partition_hash = other108.validate_partition_hash;
- return_expire_ts = other108.return_expire_ts;
- full_scan = other108.full_scan;
- __isset = other108.__isset;
-}
-get_scanner_request::get_scanner_request(get_scanner_request &&other109)
-{
- start_key = std::move(other109.start_key);
- stop_key = std::move(other109.stop_key);
- start_inclusive = std::move(other109.start_inclusive);
- stop_inclusive = std::move(other109.stop_inclusive);
- batch_size = std::move(other109.batch_size);
- no_value = std::move(other109.no_value);
- hash_key_filter_type = std::move(other109.hash_key_filter_type);
- hash_key_filter_pattern = std::move(other109.hash_key_filter_pattern);
- sort_key_filter_type = std::move(other109.sort_key_filter_type);
- sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern);
- validate_partition_hash = std::move(other109.validate_partition_hash);
- return_expire_ts = std::move(other109.return_expire_ts);
- full_scan = std::move(other109.full_scan);
- __isset = std::move(other109.__isset);
-}
-get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110)
-{
- start_key = other110.start_key;
- stop_key = other110.stop_key;
- start_inclusive = other110.start_inclusive;
- stop_inclusive = other110.stop_inclusive;
- batch_size = other110.batch_size;
- no_value = other110.no_value;
- hash_key_filter_type = other110.hash_key_filter_type;
- hash_key_filter_pattern = other110.hash_key_filter_pattern;
- sort_key_filter_type = other110.sort_key_filter_type;
- sort_key_filter_pattern = other110.sort_key_filter_pattern;
- validate_partition_hash = other110.validate_partition_hash;
- return_expire_ts = other110.return_expire_ts;
- full_scan = other110.full_scan;
- __isset = other110.__isset;
+get_scanner_request::get_scanner_request(const get_scanner_request &other136)
+{
+ start_key = other136.start_key;
+ stop_key = other136.stop_key;
+ start_inclusive = other136.start_inclusive;
+ stop_inclusive = other136.stop_inclusive;
+ batch_size = other136.batch_size;
+ no_value = other136.no_value;
+ hash_key_filter_type = other136.hash_key_filter_type;
+ hash_key_filter_pattern = other136.hash_key_filter_pattern;
+ sort_key_filter_type = other136.sort_key_filter_type;
+ sort_key_filter_pattern = other136.sort_key_filter_pattern;
+ validate_partition_hash = other136.validate_partition_hash;
+ return_expire_ts = other136.return_expire_ts;
+ full_scan = other136.full_scan;
+ __isset = other136.__isset;
+}
+get_scanner_request::get_scanner_request(get_scanner_request &&other137)
+{
+ start_key = std::move(other137.start_key);
+ stop_key = std::move(other137.stop_key);
+ start_inclusive = std::move(other137.start_inclusive);
+ stop_inclusive = std::move(other137.stop_inclusive);
+ batch_size = std::move(other137.batch_size);
+ no_value = std::move(other137.no_value);
+ hash_key_filter_type = std::move(other137.hash_key_filter_type);
+ hash_key_filter_pattern = std::move(other137.hash_key_filter_pattern);
+ sort_key_filter_type = std::move(other137.sort_key_filter_type);
+ sort_key_filter_pattern = std::move(other137.sort_key_filter_pattern);
+ validate_partition_hash = std::move(other137.validate_partition_hash);
+ return_expire_ts = std::move(other137.return_expire_ts);
+ full_scan = std::move(other137.full_scan);
+ __isset = std::move(other137.__isset);
+}
+get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other138)
+{
+ start_key = other138.start_key;
+ stop_key = other138.stop_key;
+ start_inclusive = other138.start_inclusive;
+ stop_inclusive = other138.stop_inclusive;
+ batch_size = other138.batch_size;
+ no_value = other138.no_value;
+ hash_key_filter_type = other138.hash_key_filter_type;
+ hash_key_filter_pattern = other138.hash_key_filter_pattern;
+ sort_key_filter_type = other138.sort_key_filter_type;
+ sort_key_filter_pattern = other138.sort_key_filter_pattern;
+ validate_partition_hash = other138.validate_partition_hash;
+ return_expire_ts = other138.return_expire_ts;
+ full_scan = other138.full_scan;
+ __isset = other138.__isset;
return *this;
}
-get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other111)
-{
- start_key = std::move(other111.start_key);
- stop_key = std::move(other111.stop_key);
- start_inclusive = std::move(other111.start_inclusive);
- stop_inclusive = std::move(other111.stop_inclusive);
- batch_size = std::move(other111.batch_size);
- no_value = std::move(other111.no_value);
- hash_key_filter_type = std::move(other111.hash_key_filter_type);
- hash_key_filter_pattern = std::move(other111.hash_key_filter_pattern);
- sort_key_filter_type = std::move(other111.sort_key_filter_type);
- sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern);
- validate_partition_hash = std::move(other111.validate_partition_hash);
- return_expire_ts = std::move(other111.return_expire_ts);
- full_scan = std::move(other111.full_scan);
- __isset = std::move(other111.__isset);
+get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other139)
+{
+ start_key = std::move(other139.start_key);
+ stop_key = std::move(other139.stop_key);
+ start_inclusive = std::move(other139.start_inclusive);
+ stop_inclusive = std::move(other139.stop_inclusive);
+ batch_size = std::move(other139.batch_size);
+ no_value = std::move(other139.no_value);
+ hash_key_filter_type = std::move(other139.hash_key_filter_type);
+ hash_key_filter_pattern = std::move(other139.hash_key_filter_pattern);
+ sort_key_filter_type = std::move(other139.sort_key_filter_type);
+ sort_key_filter_pattern = std::move(other139.sort_key_filter_pattern);
+ validate_partition_hash = std::move(other139.validate_partition_hash);
+ return_expire_ts = std::move(other139.return_expire_ts);
+ full_scan = std::move(other139.full_scan);
+ __isset = std::move(other139.__isset);
return *this;
}
void get_scanner_request::printTo(std::ostream &out) const
@@ -4116,26 +4682,26 @@ void swap(scan_request &a, scan_request &b)
swap(a.__isset, b.__isset);
}
-scan_request::scan_request(const scan_request &other112)
+scan_request::scan_request(const scan_request &other140)
{
- context_id = other112.context_id;
- __isset = other112.__isset;
+ context_id = other140.context_id;
+ __isset = other140.__isset;
}
-scan_request::scan_request(scan_request &&other113)
+scan_request::scan_request(scan_request &&other141)
{
- context_id = std::move(other113.context_id);
- __isset = std::move(other113.__isset);
+ context_id = std::move(other141.context_id);
+ __isset = std::move(other141.__isset);
}
-scan_request &scan_request::operator=(const scan_request &other114)
+scan_request &scan_request::operator=(const scan_request &other142)
{
- context_id = other114.context_id;
- __isset = other114.__isset;
+ context_id = other142.context_id;
+ __isset = other142.__isset;
return *this;
}
-scan_request &scan_request::operator=(scan_request &&other115)
+scan_request &scan_request::operator=(scan_request &&other143)
{
- context_id = std::move(other115.context_id);
- __isset = std::move(other115.__isset);
+ context_id = std::move(other143.context_id);
+ __isset = std::move(other143.__isset);
return *this;
}
void scan_request::printTo(std::ostream &out) const
@@ -4191,13 +4757,13 @@ uint32_t scan_response::read(::apache::thrift::protocol::TProtocol *iprot)
if (ftype == ::apache::thrift::protocol::T_LIST) {
{
this->kvs.clear();
- uint32_t _size116;
- ::apache::thrift::protocol::TType _etype119;
- xfer += iprot->readListBegin(_etype119, _size116);
- this->kvs.resize(_size116);
- uint32_t _i120;
- for (_i120 = 0; _i120 < _size116; ++_i120) {
- xfer += this->kvs[_i120].read(iprot);
+ uint32_t _size144;
+ ::apache::thrift::protocol::TType _etype147;
+ xfer += iprot->readListBegin(_etype147, _size144);
+ this->kvs.resize(_size144);
+ uint32_t _i148;
+ for (_i148 = 0; _i148 < _size144; ++_i148) {
+ xfer += this->kvs[_i148].read(iprot);
}
xfer += iprot->readListEnd();
}
@@ -4264,9 +4830,9 @@ uint32_t scan_response::write(::apache::thrift::protocol::TProtocol *oprot) cons
{
xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT,
static_cast<uint32_t>(this->kvs.size()));
- std::vector<key_value>::const_iterator _iter121;
- for (_iter121 = this->kvs.begin(); _iter121 != this->kvs.end(); ++_iter121) {
- xfer += (*_iter121).write(oprot);
+ std::vector<key_value>::const_iterator _iter149;
+ for (_iter149 = this->kvs.begin(); _iter149 != this->kvs.end(); ++_iter149) {
+ xfer += (*_iter149).write(oprot);
}
xfer += oprot->writeListEnd();
}
@@ -4305,46 +4871,46 @@ void swap(scan_response &a, scan_response &b)
swap(a.__isset, b.__isset);
}
-scan_response::scan_response(const scan_response &other122)
+scan_response::scan_response(const scan_response &other150)
{
- error = other122.error;
- kvs = other122.kvs;
- context_id = other122.context_id;
- app_id = other122.app_id;
- partition_index = other122.partition_index;
- server = other122.server;
- __isset = other122.__isset;
+ error = other150.error;
+ kvs = other150.kvs;
+ context_id = other150.context_id;
+ app_id = other150.app_id;
+ partition_index = other150.partition_index;
+ server = other150.server;
+ __isset = other150.__isset;
}
-scan_response::scan_response(scan_response &&other123)
+scan_response::scan_response(scan_response &&other151)
{
- error = std::move(other123.error);
- kvs = std::move(other123.kvs);
- context_id = std::move(other123.context_id);
- app_id = std::move(other123.app_id);
- partition_index = std::move(other123.partition_index);
- server = std::move(other123.server);
- __isset = std::move(other123.__isset);
+ error = std::move(other151.error);
+ kvs = std::move(other151.kvs);
+ context_id = std::move(other151.context_id);
+ app_id = std::move(other151.app_id);
+ partition_index = std::move(other151.partition_index);
+ server = std::move(other151.server);
+ __isset = std::move(other151.__isset);
}
-scan_response &scan_response::operator=(const scan_response &other124)
+scan_response &scan_response::operator=(const scan_response &other152)
{
- error = other124.error;
- kvs = other124.kvs;
- context_id = other124.context_id;
- app_id = other124.app_id;
- partition_index = other124.partition_index;
- server = other124.server;
- __isset = other124.__isset;
+ error = other152.error;
+ kvs = other152.kvs;
+ context_id = other152.context_id;
+ app_id = other152.app_id;
+ partition_index = other152.partition_index;
+ server = other152.server;
+ __isset = other152.__isset;
return *this;
}
-scan_response &scan_response::operator=(scan_response &&other125)
+scan_response &scan_response::operator=(scan_response &&other153)
{
- error = std::move(other125.error);
- kvs = std::move(other125.kvs);
- context_id = std::move(other125.context_id);
- app_id = std::move(other125.app_id);
- partition_index = std::move(other125.partition_index);
- server = std::move(other125.server);
- __isset = std::move(other125.__isset);
+ error = std::move(other153.error);
+ kvs = std::move(other153.kvs);
+ context_id = std::move(other153.context_id);
+ app_id = std::move(other153.app_id);
+ partition_index = std::move(other153.partition_index);
+ server = std::move(other153.server);
+ __isset = std::move(other153.__isset);
return *this;
}
void scan_response::printTo(std::ostream &out) const
@@ -4515,42 +5081,42 @@ void swap(duplicate_request &a, duplicate_request &b)
swap(a.__isset, b.__isset);
}
-duplicate_request::duplicate_request(const duplicate_request &other126)
+duplicate_request::duplicate_request(const duplicate_request &other154)
{
- timestamp = other126.timestamp;
- task_code = other126.task_code;
- raw_message = other126.raw_message;
- cluster_id = other126.cluster_id;
- verify_timetag = other126.verify_timetag;
- __isset = other126.__isset;
+ timestamp = other154.timestamp;
+ task_code = other154.task_code;
+ raw_message = other154.raw_message;
+ cluster_id = other154.cluster_id;
+ verify_timetag = other154.verify_timetag;
+ __isset = other154.__isset;
}
-duplicate_request::duplicate_request(duplicate_request &&other127)
+duplicate_request::duplicate_request(duplicate_request &&other155)
{
- timestamp = std::move(other127.timestamp);
- task_code = std::move(other127.task_code);
- raw_message = std::move(other127.raw_message);
- cluster_id = std::move(other127.cluster_id);
- verify_timetag = std::move(other127.verify_timetag);
- __isset = std::move(other127.__isset);
+ timestamp = std::move(other155.timestamp);
+ task_code = std::move(other155.task_code);
+ raw_message = std::move(other155.raw_message);
+ cluster_id = std::move(other155.cluster_id);
+ verify_timetag = std::move(other155.verify_timetag);
+ __isset = std::move(other155.__isset);
}
-duplicate_request &duplicate_request::operator=(const duplicate_request &other128)
+duplicate_request &duplicate_request::operator=(const duplicate_request &other156)
{
- timestamp = other128.timestamp;
- task_code = other128.task_code;
- raw_message = other128.raw_message;
- cluster_id = other128.cluster_id;
- verify_timetag = other128.verify_timetag;
- __isset = other128.__isset;
+ timestamp = other156.timestamp;
+ task_code = other156.task_code;
+ raw_message = other156.raw_message;
+ cluster_id = other156.cluster_id;
+ verify_timetag = other156.verify_timetag;
+ __isset = other156.__isset;
return *this;
}
-duplicate_request &duplicate_request::operator=(duplicate_request &&other129)
+duplicate_request &duplicate_request::operator=(duplicate_request &&other157)
{
- timestamp = std::move(other129.timestamp);
- task_code = std::move(other129.task_code);
- raw_message = std::move(other129.raw_message);
- cluster_id = std::move(other129.cluster_id);
- verify_timetag = std::move(other129.verify_timetag);
- __isset = std::move(other129.__isset);
+ timestamp = std::move(other157.timestamp);
+ task_code = std::move(other157.task_code);
+ raw_message = std::move(other157.raw_message);
+ cluster_id = std::move(other157.cluster_id);
+ verify_timetag = std::move(other157.verify_timetag);
+ __isset = std::move(other157.__isset);
return *this;
}
void duplicate_request::printTo(std::ostream &out) const
@@ -4664,30 +5230,30 @@ void swap(duplicate_response &a, duplicate_response &b)
swap(a.__isset, b.__isset);
}
-duplicate_response::duplicate_response(const duplicate_response &other130)
+duplicate_response::duplicate_response(const duplicate_response &other158)
{
- error = other130.error;
- error_hint = other130.error_hint;
- __isset = other130.__isset;
+ error = other158.error;
+ error_hint = other158.error_hint;
+ __isset = other158.__isset;
}
-duplicate_response::duplicate_response(duplicate_response &&other131)
+duplicate_response::duplicate_response(duplicate_response &&other159)
{
- error = std::move(other131.error);
- error_hint = std::move(other131.error_hint);
- __isset = std::move(other131.__isset);
+ error = std::move(other159.error);
+ error_hint = std::move(other159.error_hint);
+ __isset = std::move(other159.__isset);
}
-duplicate_response &duplicate_response::operator=(const duplicate_response &other132)
+duplicate_response &duplicate_response::operator=(const duplicate_response &other160)
{
- error = other132.error;
- error_hint = other132.error_hint;
- __isset = other132.__isset;
+ error = other160.error;
+ error_hint = other160.error_hint;
+ __isset = other160.__isset;
return *this;
}
-duplicate_response &duplicate_response::operator=(duplicate_response &&other133)
+duplicate_response &duplicate_response::operator=(duplicate_response &&other161)
{
- error = std::move(other133.error);
- error_hint = std::move(other133.error_hint);
- __isset = std::move(other133.__isset);
+ error = std::move(other161.error);
+ error_hint = std::move(other161.error_hint);
+ __isset = std::move(other161.__isset);
return *this;
}
void duplicate_response::printTo(std::ostream &out) const
@@ -4701,6 +5267,5 @@ void duplicate_response::printTo(std::ostream &out) const
(__isset.error_hint ? (out << to_string(error_hint)) : (out << "<null>"));
out << ")";
}
-
-} // namespace apps
-} // namespace dsn
+}
+} // namespace
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index b81483b..99316d2 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -163,6 +163,29 @@ struct multi_get_response
6:string server;
}
+struct batch_get_request {
+ 1:list<full_key> keys;
+}
+
+struct full_key {
+ 1:dsn.blob hash_key;
+ 2:dsn.blob sort_key;
+}
+
+struct batch_get_response {
+ 1:i32 error;
+ 2:list<full_data> data;
+ 3:i32 app_id;
+ 4:i32 partition_index;
+ 6:string server;
+}
+
+struct full_data {
+ 1:dsn.blob hash_key;
+ 2:dsn.blob sort_key;
+ 3:dsn.blob value;
+}
+
struct incr_request
{
1:dsn.blob key;
@@ -310,6 +333,7 @@ service rrdb
check_and_mutate_response check_and_mutate(1:check_and_mutate_request request);
read_response get(1:dsn.blob key);
multi_get_response multi_get(1:multi_get_request request);
+ batch_get_response batch_get(1:batch_get_request request);
count_response sortkey_count(1:dsn.blob hash_key);
ttl_response ttl(1:dsn.blob key);
diff --git a/src/include/rrdb/rrdb.client.h b/src/include/rrdb/rrdb.client.h
index 824a644..a9e9eed 100644
--- a/src/include/rrdb/rrdb.client.h
+++ b/src/include/rrdb/rrdb.client.h
@@ -299,6 +299,32 @@ public:
reply_thread_hash);
}
+ // ---------- call RPC_RRDB_RRDB_BATCH_GET ------------
+ // - synchronous
+ std::pair<::dsn::error_code, batch_get_response> batch_get_sync(
+ const batch_get_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
+ {
+ return ::dsn::rpc::wait_and_unwrap<batch_get_response>(_resolver->call_op(
+ RPC_RRDB_RRDB_BATCH_GET, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
+ }
+
+ // - asynchronous with on-stack BatchGetRequest and BatchGetResponse
+ template <typename TCallback>
+ ::dsn::task_ptr batch_get(const batch_get_request &args,
+ TCallback &&callback,
+ std::chrono::milliseconds timeout,
+ uint64_t request_partition_hash,
+ int reply_thread_hash = 0)
+ {
+ return _resolver->call_op(RPC_RRDB_RRDB_BATCH_GET,
+ args,
+ &_tracker,
+ std::forward<TCallback>(callback),
+ timeout,
+ request_partition_hash,
+ reply_thread_hash);
+ }
+
// ---------- call RPC_RRDB_RRDB_SORTKEY_COUNT ------------
// - synchronous
std::pair<::dsn::error_code, count_response> sortkey_count_sync(
diff --git a/src/include/rrdb/rrdb.code.definition.h b/src/include/rrdb/rrdb.code.definition.h
index f9cfb54..9cc821d 100644
--- a/src/include/rrdb/rrdb.code.definition.h
+++ b/src/include/rrdb/rrdb.code.definition.h
@@ -37,5 +37,6 @@ DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_GET_SCANNER)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_SCAN)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_CLEAR_SCANNER)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET)
+DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_BATCH_GET)
}
}
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index 205216a..0e36678 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -93,6 +93,14 @@ class multi_get_request;
class multi_get_response;
+class batch_get_request;
+
+class full_key;
+
+class batch_get_response;
+
+class full_data;
+
class incr_request;
class incr_response;
@@ -937,6 +945,235 @@ inline std::ostream &operator<<(std::ostream &out, const multi_get_response &obj
return out;
}
+typedef struct _batch_get_request__isset
+{
+ _batch_get_request__isset() : keys(false) {}
+ bool keys : 1;
+} _batch_get_request__isset;
+
+class batch_get_request
+{
+public:
+ batch_get_request(const batch_get_request &);
+ batch_get_request(batch_get_request &&);
+ batch_get_request &operator=(const batch_get_request &);
+ batch_get_request &operator=(batch_get_request &&);
+ batch_get_request() {}
+
+ virtual ~batch_get_request() throw();
+ std::vector<full_key> keys;
+
+ _batch_get_request__isset __isset;
+
+ void __set_keys(const std::vector<full_key> &val);
+
+ bool operator==(const batch_get_request &rhs) const
+ {
+ if (!(keys == rhs.keys))
+ return false;
+ return true;
+ }
+ bool operator!=(const batch_get_request &rhs) const { return !(*this == rhs); }
+
+ bool operator<(const batch_get_request &) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
+
+ virtual void printTo(std::ostream &out) const;
+};
+
+void swap(batch_get_request &a, batch_get_request &b);
+
+inline std::ostream &operator<<(std::ostream &out, const batch_get_request &obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
+typedef struct _full_key__isset
+{
+ _full_key__isset() : hash_key(false), sort_key(false) {}
+ bool hash_key : 1;
+ bool sort_key : 1;
+} _full_key__isset;
+
+class full_key
+{
+public:
+ full_key(const full_key &);
+ full_key(full_key &&);
+ full_key &operator=(const full_key &);
+ full_key &operator=(full_key &&);
+ full_key() {}
+
+ virtual ~full_key() throw();
+ ::dsn::blob hash_key;
+ ::dsn::blob sort_key;
+
+ _full_key__isset __isset;
+
+ void __set_hash_key(const ::dsn::blob &val);
+
+ void __set_sort_key(const ::dsn::blob &val);
+
+ bool operator==(const full_key &rhs) const
+ {
+ if (!(hash_key == rhs.hash_key))
+ return false;
+ if (!(sort_key == rhs.sort_key))
+ return false;
+ return true;
+ }
+ bool operator!=(const full_key &rhs) const { return !(*this == rhs); }
+
+ bool operator<(const full_key &) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
+
+ virtual void printTo(std::ostream &out) const;
+};
+
+void swap(full_key &a, full_key &b);
+
+inline std::ostream &operator<<(std::ostream &out, const full_key &obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
+typedef struct _batch_get_response__isset
+{
+ _batch_get_response__isset()
+ : error(false), data(false), app_id(false), partition_index(false), server(false)
+ {
+ }
+ bool error : 1;
+ bool data : 1;
+ bool app_id : 1;
+ bool partition_index : 1;
+ bool server : 1;
+} _batch_get_response__isset;
+
+class batch_get_response
+{
+public:
+ batch_get_response(const batch_get_response &);
+ batch_get_response(batch_get_response &&);
+ batch_get_response &operator=(const batch_get_response &);
+ batch_get_response &operator=(batch_get_response &&);
+ batch_get_response() : error(0), app_id(0), partition_index(0), server() {}
+
+ virtual ~batch_get_response() throw();
+ int32_t error;
+ std::vector<full_data> data;
+ int32_t app_id;
+ int32_t partition_index;
+ std::string server;
+
+ _batch_get_response__isset __isset;
+
+ void __set_error(const int32_t val);
+
+ void __set_data(const std::vector<full_data> &val);
+
+ void __set_app_id(const int32_t val);
+
+ void __set_partition_index(const int32_t val);
+
+ void __set_server(const std::string &val);
+
+ bool operator==(const batch_get_response &rhs) const
+ {
+ if (!(error == rhs.error))
+ return false;
+ if (!(data == rhs.data))
+ return false;
+ if (!(app_id == rhs.app_id))
+ return false;
+ if (!(partition_index == rhs.partition_index))
+ return false;
+ if (!(server == rhs.server))
+ return false;
+ return true;
+ }
+ bool operator!=(const batch_get_response &rhs) const { return !(*this == rhs); }
+
+ bool operator<(const batch_get_response &) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
+
+ virtual void printTo(std::ostream &out) const;
+};
+
+void swap(batch_get_response &a, batch_get_response &b);
+
+inline std::ostream &operator<<(std::ostream &out, const batch_get_response &obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
+typedef struct _full_data__isset
+{
+ _full_data__isset() : hash_key(false), sort_key(false), value(false) {}
+ bool hash_key : 1;
+ bool sort_key : 1;
+ bool value : 1;
+} _full_data__isset;
+
+class full_data
+{
+public:
+ full_data(const full_data &);
+ full_data(full_data &&);
+ full_data &operator=(const full_data &);
+ full_data &operator=(full_data &&);
+ full_data() {}
+
+ virtual ~full_data() throw();
+ ::dsn::blob hash_key;
+ ::dsn::blob sort_key;
+ ::dsn::blob value;
+
+ _full_data__isset __isset;
+
+ void __set_hash_key(const ::dsn::blob &val);
+
+ void __set_sort_key(const ::dsn::blob &val);
+
+ void __set_value(const ::dsn::blob &val);
+
+ bool operator==(const full_data &rhs) const
+ {
+ if (!(hash_key == rhs.hash_key))
+ return false;
+ if (!(sort_key == rhs.sort_key))
+ return false;
+ if (!(value == rhs.value))
+ return false;
+ return true;
+ }
+ bool operator!=(const full_data &rhs) const { return !(*this == rhs); }
+
+ bool operator<(const full_data &) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
+
+ virtual void printTo(std::ostream &out) const;
+};
+
+void swap(full_data &a, full_data &b);
+
+inline std::ostream &operator<<(std::ostream &out, const full_data &obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
typedef struct _incr_request__isset
{
_incr_request__isset() : key(false), increment(false), expire_ts_seconds(false) {}
@@ -1977,8 +2214,7 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj
obj.printTo(out);
return out;
}
-
-} // namespace apps
-} // namespace dsn
+}
+} // namespace
#endif
diff --git a/src/server/brief_stat.cpp b/src/server/brief_stat.cpp
index 767f6b4..04be2fd 100644
--- a/src/server/brief_stat.cpp
+++ b/src/server/brief_stat.cpp
@@ -30,6 +30,8 @@ static std::map<std::string, std::string> s_brief_stat_map = {
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server", "get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.qps", "multi_get_qps"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server", "multi_get_p99(ns)"},
+ {"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.qps", "batch_get_qps"},
+ {"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.latency.server", "batch_get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.qps", "put_qps"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server", "put_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.qps", "multi_put_qps"},
diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index 5f978cd..c630bef 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -80,6 +80,10 @@ capacity_unit_calculator::capacity_unit_calculator(
_pfc_multi_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes");
+ snprintf(name, 255, "batch_get_bytes@%s", str_gpid.c_str());
+ _pfc_batch_get_bytes.init_app_counter(
+ "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the batch get bytes");
+
snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str());
_pfc_scan_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes");
@@ -175,6 +179,32 @@ void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
}
+void capacity_unit_calculator::add_batch_get_cu(dsn::message_ex *req,
+ int32_t status,
+ const std::vector<::dsn::apps::full_data> &datas)
+{
+ int64_t data_size = 0;
+ for (const auto &data : datas) {
+ data_size += data.hash_key.size() + data.sort_key.size() + data.value.size();
+ _read_hotkey_collector->capture_hash_key(data.hash_key, 1);
+ }
+
+ _pfc_batch_get_bytes->add(data_size);
+ add_backup_request_bytes(req, data_size);
+
+ if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
+ status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
+ return;
+ }
+
+ if (status == rocksdb::Status::kNotFound) {
+ add_read_cu(1);
+ return;
+ }
+
+ add_read_cu(data_size);
+}
+
void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req,
int32_t status,
const std::vector<::dsn::apps::key_value> &kvs)
diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h
index f7224a2..0ff0ac7 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -52,6 +52,9 @@ public:
int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs);
+ void add_batch_get_cu(dsn::message_ex *req,
+ int32_t status,
+ const std::vector<::dsn::apps::full_data> &rows);
void add_scan_cu(dsn::message_ex *req,
int32_t status,
const std::vector<::dsn::apps::key_value> &kvs);
@@ -101,6 +104,7 @@ private:
::dsn::perf_counter_wrapper _pfc_get_bytes;
::dsn::perf_counter_wrapper _pfc_multi_get_bytes;
+ ::dsn::perf_counter_wrapper _pfc_batch_get_bytes;
::dsn::perf_counter_wrapper _pfc_scan_bytes;
::dsn::perf_counter_wrapper _pfc_put_bytes;
::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
diff --git a/src/server/config.ini b/src/server/config.ini
index b823a3f..16148ea 100644
--- a/src/server/config.ini
+++ b/src/server/config.ini
@@ -629,6 +629,15 @@
[task.RPC_RRDB_RRDB_MULTI_GET_ACK]
is_profile = true
+[task.RPC_RRDB_RRDB_BATCH_GET]
+ rpc_request_throttling_mode = TM_DELAY
+ rpc_request_delays_milliseconds = 50, 50, 50, 50, 50, 100
+ is_profile = true
+ profiler::size.response.server = true
+
+[task.RPC_RRDB_RRDB_BATCH_GET_ACK]
+ is_profile = true
+
[task.RPC_RRDB_RRDB_SORTKEY_COUNT]
;rpc_request_throttling_mode = TM_DELAY
;rpc_request_delays_milliseconds = 50, 50, 50, 50, 50, 100
diff --git a/src/server/config.min.ini b/src/server/config.min.ini
index 3e7556b..dfcc294 100644
--- a/src/server/config.min.ini
+++ b/src/server/config.min.ini
@@ -239,3 +239,10 @@
is_profile = true
profiler::size.response.server = true
rpc_request_throttling_mode = TM_REJECT
+
+[task.RPC_RRDB_RRDB_BATCH_GET]
+ is_profile = true
+ profiler::size.response.server = true
+
+[task.RPC_RRDB_RRDB_BATCH_GET_ACK]
+ is_profile = true
diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h
index 67b6a80..df531b2 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -27,6 +27,8 @@ namespace server {
typedef ::dsn::rpc_holder<::dsn::blob, ::dsn::apps::read_response> get_rpc;
typedef ::dsn::rpc_holder<dsn::apps::multi_get_request, dsn::apps::multi_get_response>
multi_get_rpc;
+typedef ::dsn::rpc_holder<::dsn::apps::batch_get_request, ::dsn::apps::batch_get_response>
+ batch_get_rpc;
typedef ::dsn::rpc_holder<::dsn::blob, dsn::apps::count_response> sortkey_count_rpc;
typedef ::dsn::rpc_holder<::dsn::blob, dsn::apps::ttl_response> ttl_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::get_scanner_request, dsn::apps::scan_response>
@@ -53,6 +55,8 @@ protected:
virtual void on_get(get_rpc rpc) = 0;
// RPC_RRDB_RRDB_MULTI_GET
virtual void on_multi_get(multi_get_rpc rpc) = 0;
+ // RPC_RRDB_RRDB_BATCH_GET
+ virtual void on_batch_get(batch_get_rpc rpc) = 0;
// RPC_RRDB_RRDB_SORTKEY_COUNT
virtual void on_sortkey_count(sortkey_count_rpc rpc) = 0;
// RPC_RRDB_RRDB_TTL
@@ -70,6 +74,8 @@ protected:
register_rpc_handler_with_rpc_holder(
dsn::apps::RPC_RRDB_RRDB_MULTI_GET, "multi_get", on_multi_get);
register_rpc_handler_with_rpc_holder(
+ dsn::apps::RPC_RRDB_RRDB_BATCH_GET, "batch_get", on_batch_get);
+ register_rpc_handler_with_rpc_holder(
dsn::apps::RPC_RRDB_RRDB_SORTKEY_COUNT, "sortkey_count", on_sortkey_count);
register_rpc_handler_with_rpc_holder(dsn::apps::RPC_RRDB_RRDB_TTL, "ttl", on_ttl);
register_rpc_handler_with_rpc_holder(
@@ -85,6 +91,10 @@ private:
{
svc->on_multi_get(rpc);
}
+ static void on_batch_get(pegasus_read_service *svc, batch_get_rpc rpc)
+ {
+ svc->on_batch_get(rpc);
+ }
static void on_sortkey_count(pegasus_read_service *svc, sortkey_count_rpc rpc)
{
svc->on_sortkey_count(rpc);
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 652d4e2..4221aea 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -776,6 +776,122 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+ dassert(_is_open, "");
+ _pfc_batch_get_qps->increment();
+ int64_t start_time = dsn_now_ns();
+
+ auto &response = rpc.response();
+ response.app_id = _gpid.get_app_id();
+ response.partition_index = _gpid.get_partition_index();
+ response.server = _primary_address;
+
+ if (!_read_size_throttling_controller->available()) {
+ rpc.error() = dsn::ERR_BUSY;
+ _counter_recent_read_throttling_reject_count->increment();
+ return;
+ }
+
+ const auto &request = rpc.request();
+ if (request.keys.empty()) {
+ response.error = rocksdb::Status::kInvalidArgument;
+ derror_replica("Invalid argument for batch_get from {}: 'keys' field in request is empty",
+ rpc.remote_address().to_string());
+ _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
+ _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+ return;
+ }
+
+ std::vector<rocksdb::Slice> keys;
+ keys.reserve(request.keys.size());
+ std::vector<::dsn::blob> keys_holder;
+ keys_holder.reserve(request.keys.size());
+ for (const auto &key : request.keys) {
+ dsn::blob raw_key;
+ pegasus_generate_key(raw_key, key.hash_key, key.sort_key);
+ keys.emplace_back(rocksdb::Slice(raw_key.data(), raw_key.length()));
+ keys_holder.emplace_back(std::move(raw_key));
+ }
+
+ rocksdb::Status final_status;
+ bool error_occurred = false;
+ int64_t total_data_size = 0;
+ uint32_t epoch_now = pegasus::utils::epoch_now();
+ std::vector<std::string> values;
+ std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
+ response.data.reserve(request.keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ const auto &status = statuses[i];
+ if (status.IsNotFound()) {
+ continue;
+ }
+
+ const ::dsn::blob &hash_key = request.keys[i].hash_key;
+ const ::dsn::blob &sort_key = request.keys[i].sort_key;
+ std::string &value = values[i];
+
+ if (dsn_likely(status.ok())) {
+ if (check_if_record_expired(epoch_now, value)) {
+ if (_verbose_log) {
+ derror_replica(
+ "rocksdb data expired for batch_get from {}, hash_key = {}, sort_key = {}",
+ rpc.remote_address().to_string(),
+ pegasus::utils::c_escape_string(hash_key),
+ pegasus::utils::c_escape_string(sort_key));
+ }
+ continue;
+ }
+
+ dsn::blob real_value;
+ pegasus_extract_user_data(_pegasus_data_version, std::move(value), real_value);
+ dsn::apps::full_data current_data;
+ current_data.hash_key = hash_key;
+ current_data.sort_key = sort_key;
+ current_data.value = std::move(real_value);
+ total_data_size += current_data.value.size();
+ response.data.emplace_back(std::move(current_data));
+ } else {
+ if (_verbose_log) {
+ derror_replica(
+ "rocksdb get failed for batch_get from {}: error = {}, key size = {}",
+ rpc.remote_address().to_string(),
+ status.ToString(),
+ request.keys.size());
+ } else {
+ derror_replica("rocksdb get failed for batch_get from {}: error = {}",
+ rpc.remote_address().to_string(),
+ status.ToString());
+ }
+
+ error_occurred = true;
+ final_status = status;
+ break;
+ }
+ }
+
+ if (error_occurred) {
+ response.error = final_status.code();
+ response.data.clear();
+ } else {
+ response.error = rocksdb::Status::kOk;
+ }
+
+ int64_t time_used = dsn_now_ns() - start_time;
+ if (is_batch_get_abnormal(time_used, total_data_size, request.keys.size())) {
+ dwarn_replica("rocksdb abnormal batch_get from {}: total data size = {}, row count = {}, "
+ "time_used = {} us",
+ rpc.remote_address().to_string(),
+ total_data_size,
+ request.keys.size(),
+ time_used / 1000);
+ _pfc_recent_abnormal_count->increment();
+ }
+
+ _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
+ _pfc_batch_get_latency->set(time_used);
+}
+
void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
{
dassert(_is_open, "");
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 05de528..d7f955b 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -26,6 +26,7 @@
#include <rocksdb/options.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/replication/replication.codes.h>
+#include <dsn/utility/flags.h>
#include <rrdb/rrdb_types.h>
#include <gtest/gtest_prod.h>
#include <rocksdb/rate_limiter.h>
@@ -47,6 +48,9 @@ typedef dsn::utils::token_bucket_throttling_controller throttling_controller;
namespace pegasus {
namespace server {
+DSN_DECLARE_uint64(rocksdb_abnormal_batch_get_bytes_threshold);
+DSN_DECLARE_uint64(rocksdb_abnormal_batch_get_count_threshold);
+
class meta_store;
class capacity_unit_calculator;
class pegasus_server_write;
@@ -76,6 +80,7 @@ public:
// the following methods may set physical error if internal error occurs
void on_get(get_rpc rpc) override;
void on_multi_get(multi_get_rpc rpc) override;
+ void on_batch_get(batch_get_rpc rpc) override;
void on_sortkey_count(sortkey_count_rpc rpc) override;
void on_ttl(ttl_rpc rpc) override;
void on_get_scanner(get_scanner_rpc rpc) override;
@@ -343,6 +348,23 @@ private:
return false;
}
+ bool is_batch_get_abnormal(uint64_t time_used, uint64_t size, uint64_t count)
+ {
+ if (FLAGS_rocksdb_abnormal_batch_get_bytes_threshold &&
+ size >= FLAGS_rocksdb_abnormal_batch_get_bytes_threshold) {
+ return true;
+ }
+ if (FLAGS_rocksdb_abnormal_batch_get_count_threshold &&
+ count >= FLAGS_rocksdb_abnormal_batch_get_count_threshold) {
+ return true;
+ }
+ if (time_used >= _slow_query_threshold_ns) {
+ return true;
+ }
+
+ return false;
+ }
+
bool is_get_abnormal(uint64_t time_used, uint64_t value_size)
{
if (_abnormal_get_size_threshold && value_size >= _abnormal_get_size_threshold) {
@@ -444,10 +466,12 @@ private:
// perf counters
::dsn::perf_counter_wrapper _pfc_get_qps;
::dsn::perf_counter_wrapper _pfc_multi_get_qps;
+ ::dsn::perf_counter_wrapper _pfc_batch_get_qps;
::dsn::perf_counter_wrapper _pfc_scan_qps;
::dsn::perf_counter_wrapper _pfc_get_latency;
::dsn::perf_counter_wrapper _pfc_multi_get_latency;
+ ::dsn::perf_counter_wrapper _pfc_batch_get_latency;
::dsn::perf_counter_wrapper _pfc_scan_latency;
::dsn::perf_counter_wrapper _pfc_recent_expire_count;
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 2aca7d5..80d7069 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -81,6 +81,20 @@ DSN_DEFINE_validator(read_amp_bytes_per_bit, [](const int64_t read_amp_bytes_per
(read_amp_bytes_per_bit & (read_amp_bytes_per_bit - 1)) == 0);
});
+DSN_DEFINE_uint64("pegasus.server",
+ rocksdb_abnormal_batch_get_bytes_threshold,
+ 1e7,
+ "batch-get operation total key-value bytes size exceed this "
+ "threshold will be logged, 0 means no check");
+DSN_TAG_VARIABLE(rocksdb_abnormal_batch_get_bytes_threshold, FT_MUTABLE);
+
+DSN_DEFINE_uint64(
+ "pegasus.server",
+ rocksdb_abnormal_batch_get_count_threshold,
+ 2000,
+ "batch-get operation iterate count exceed this threshold will be logged, 0 means no check");
+DSN_TAG_VARIABLE(rocksdb_abnormal_batch_get_count_threshold, FT_MUTABLE);
+
static const std::unordered_map<std::string, rocksdb::BlockBasedTableOptions::IndexType>
INDEX_TYPE_STRING_MAP = {
{"binary_search", rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch},
@@ -558,6 +572,10 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_pfc_multi_get_qps.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request");
+ snprintf(name, 255, "batch_get_qps@%s", str_gpid.c_str());
+ _pfc_batch_get_qps.init_app_counter(
+ "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of BATCH_GET request");
+
snprintf(name, 255, "scan_qps@%s", str_gpid.c_str());
_pfc_scan_qps.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of SCAN request");
@@ -574,6 +592,12 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of MULTI_GET request");
+ snprintf(name, 255, "batch_get_latency@%s", str_gpid.c_str());
+ _pfc_batch_get_latency.init_app_counter("app.pegasus",
+ name,
+ COUNTER_TYPE_NUMBER_PERCENTILES,
+ "statistic the latency of BATCH_GET request");
+
snprintf(name, 255, "scan_latency@%s", str_gpid.c_str());
_pfc_scan_latency.init_app_counter("app.pegasus",
name,
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index a76ac41..e1912b2 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -583,7 +583,7 @@ struct row_data
row_data() = default;
explicit row_data(const std::string &row_name) : row_name(row_name) {}
- double get_total_read_qps() const { return get_qps + multi_get_qps + scan_qps; }
+ double get_total_read_qps() const { return get_qps + multi_get_qps + batch_get_qps + scan_qps; }
double get_total_write_qps() const
{
@@ -591,7 +591,10 @@ struct row_data
check_and_mutate_qps + incr_qps + duplicate_qps;
}
- double get_total_read_bytes() const { return get_bytes + multi_get_bytes + scan_bytes; }
+ double get_total_read_bytes() const
+ {
+ return get_bytes + multi_get_bytes + batch_get_bytes + scan_bytes;
+ }
double get_total_write_bytes() const
{
@@ -602,6 +605,7 @@ struct row_data
{
get_qps += row.get_qps;
multi_get_qps += row.multi_get_qps;
+ batch_get_qps += row.batch_get_qps;
put_qps += row.put_qps;
multi_put_qps += row.multi_put_qps;
remove_qps += row.remove_qps;
@@ -647,6 +651,7 @@ struct row_data
backup_request_bytes += row.backup_request_bytes;
get_bytes += row.get_bytes;
multi_get_bytes += row.multi_get_bytes;
+ batch_get_bytes += row.batch_get_bytes;
scan_bytes += row.scan_bytes;
put_bytes += row.put_bytes;
multi_put_bytes += row.multi_put_bytes;
@@ -667,6 +672,7 @@ struct row_data
int32_t partition_count = 0;
double get_qps = 0;
double multi_get_qps = 0;
+ double batch_get_qps = 0;
double put_qps = 0;
double multi_put_qps = 0;
double remove_qps = 0;
@@ -709,6 +715,7 @@ struct row_data
double backup_request_bytes = 0;
double get_bytes = 0;
double multi_get_bytes = 0;
+ double batch_get_bytes = 0;
double scan_bytes = 0;
double put_bytes = 0;
double multi_put_bytes = 0;
@@ -731,6 +738,8 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row.get_qps += value;
else if (counter_name == "multi_get_qps")
row.multi_get_qps += value;
+ else if (counter_name == "batch_get_qps")
+ row.batch_get_qps += value;
else if (counter_name == "put_qps")
row.put_qps += value;
else if (counter_name == "multi_put_qps")
@@ -815,6 +824,8 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row.get_bytes += value;
else if (counter_name == "multi_get_bytes")
row.multi_get_bytes += value;
+ else if (counter_name == "batch_get_bytes")
+ row.batch_get_bytes += value;
else if (counter_name == "scan_bytes")
row.scan_bytes += value;
else if (counter_name == "put_bytes")
diff --git a/src/shell/commands.h b/src/shell/commands.h
index be6edaf..d4e36a7 100644
--- a/src/shell/commands.h
+++ b/src/shell/commands.h
@@ -63,10 +63,12 @@ struct list_nodes_helper
double get_qps;
double put_qps;
double multi_get_qps;
+ double batch_get_qps;
double multi_put_qps;
double get_p99;
double put_p99;
double multi_get_p99;
+ double batch_get_p99;
double multi_put_p99;
double read_cu;
double write_cu;
diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp
index b650eb8..b61cc91 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -263,6 +263,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
"perf-counters-by-prefix",
{"replica*app.pegasus*get_qps",
"replica*app.pegasus*multi_get_qps",
+ "replica*app.pegasus*batch_get_qps",
"replica*app.pegasus*put_qps",
"replica*app.pegasus*multi_put_qps",
"replica*app.pegasus*recent.read.cu",
@@ -296,6 +297,8 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
h.get_qps += m.value;
else if (m.name.find("replica*app.pegasus*multi_get_qps") != std::string::npos)
h.multi_get_qps += m.value;
+ else if (m.name.find("replica*app.pegasus*batch_get_qps") != std::string::npos)
+ h.batch_get_qps += m.value;
else if (m.name.find("replica*app.pegasus*put_qps") != std::string::npos)
h.put_qps += m.value;
else if (m.name.find("replica*app.pegasus*multi_put_qps") != std::string::npos)
@@ -322,6 +325,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server",
"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server",
+ "zion*profiler*RPC_RRDB_RRDB_BATCH_GET.latency.server",
"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"});
for (int i = 0; i < nodes.size(); ++i) {
@@ -356,6 +360,8 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
h.multi_get_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_MULTI_PUT.latency.server") != std::string::npos)
h.multi_put_p99 = m.value;
+ else if (m.name.find("RPC_RRDB_RRDB_BATCH_GET.latency.server") != std::string::npos)
+ h.batch_get_p99 = m.value;
}
}
}
@@ -391,6 +397,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
if (show_qps) {
tp.add_column("get_qps", tp_alignment::kRight);
tp.add_column("mget_qps", tp_alignment::kRight);
+ tp.add_column("bget_qps", tp_alignment::kRight);
tp.add_column("read_cu", tp_alignment::kRight);
tp.add_column("put_qps", tp_alignment::kRight);
tp.add_column("mput_qps", tp_alignment::kRight);
@@ -399,6 +406,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
if (show_latency) {
tp.add_column("get_p99(ms)", tp_alignment::kRight);
tp.add_column("mget_p99(ms)", tp_alignment::kRight);
+ tp.add_column("bget_p99(ms)", tp_alignment::kRight);
tp.add_column("put_p99(ms)", tp_alignment::kRight);
tp.add_column("mput_p99(ms)", tp_alignment::kRight);
}
@@ -421,6 +429,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
if (show_qps) {
tp.append_data(kv.second.get_qps);
tp.append_data(kv.second.multi_get_qps);
+ tp.append_data(kv.second.batch_get_qps);
tp.append_data(kv.second.read_cu);
tp.append_data(kv.second.put_qps);
tp.append_data(kv.second.multi_put_qps);
@@ -429,6 +438,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
if (show_latency) {
tp.append_data(kv.second.get_p99 / 1e6);
tp.append_data(kv.second.multi_get_p99 / 1e6);
+ tp.append_data(kv.second.batch_get_p99 / 1e6);
tp.append_data(kv.second.put_p99 / 1e6);
tp.append_data(kv.second.multi_put_p99 / 1e6);
}
diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp
index e4d16c2..299e29f 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -490,6 +490,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
sum.partition_count += row.partition_count;
sum.get_qps += row.get_qps;
sum.multi_get_qps += row.multi_get_qps;
+ sum.batch_get_qps += row.batch_get_qps;
sum.put_qps += row.put_qps;
sum.multi_put_qps += row.multi_put_qps;
sum.remove_qps += row.remove_qps;
@@ -548,6 +549,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
if (!only_usage) {
tp.add_column("GET", tp_alignment::kRight);
tp.add_column("MGET", tp_alignment::kRight);
+ tp.add_column("BGET", tp_alignment::kRight);
tp.add_column("PUT", tp_alignment::kRight);
tp.add_column("MPUT", tp_alignment::kRight);
tp.add_column("DEL", tp_alignment::kRight);
@@ -584,6 +586,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args)
if (!only_usage) {
tp.append_data(row.get_qps);
tp.append_data(row.multi_get_qps);
+ tp.append_data(row.batch_get_qps);
tp.append_data(row.put_qps);
tp.append_data(row.multi_put_qps);
tp.append_data(row.remove_qps);
diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh
index 8995173..fd5aca2 100755
--- a/src/test/function_test/run.sh
+++ b/src/test/function_test/run.sh
@@ -66,6 +66,8 @@ GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_ca
exit_if_fail $? "run test slog_lost failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/recall.xml" GTEST_FILTER="drop_and_recall.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test recall failed: $test_case $config_file $table_name"
+GTEST_OUTPUT="xml:$REPORT_DIR/batch_get.xml" GTEST_FILTER="batch_get.*" ./$test_case $config_file $table_name
+exit_if_fail $? "run test batch_get failed: $test_case $config_file $table_name"
if [ $on_travis == "NO" ]; then
GTEST_OUTPUT="xml:$REPORT_DIR/restore.xml" GTEST_FILTER="restore_test.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test restore_test failed: $test_case $config_file $table_name"
diff --git a/src/test/function_test/test_batch_get.cpp b/src/test/function_test/test_batch_get.cpp
new file mode 100644
index 0000000..f44c95f
--- /dev/null
+++ b/src/test/function_test/test_batch_get.cpp
@@ -0,0 +1,104 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <vector>
+#include <string>
+
+#include <base/pegasus_const.h>
+#include <base/pegasus_key_schema.h>
+#include <dsn/service_api_c.h>
+#include <dsn/dist/replication/replication_ddl_client.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <rrdb/rrdb_types.h>
+#include <rrdb/rrdb.client.h>
+#include <rocksdb/status.h>
+
+using namespace ::pegasus;
+using namespace ::dsn;
+using namespace ::replication;
+
+extern pegasus_client *client;
+extern std::shared_ptr<replication_ddl_client> ddl_client;
+
+TEST(batch_get, set_and_then_batch_get)
+{
+ std::vector<rpc_address> meta_list;
+ replica_helper::load_meta_servers(meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), "mycluster");
+ auto rrdb_client = new ::dsn::apps::rrdb_client("mycluster", meta_list, client->get_app_name());
+
+ int test_data_count = 100;
+ int test_timeout_milliseconds = 3000;
+ uint64_t test_partition_hash = 0;
+
+ apps::batch_get_request batch_request;
+ std::vector<std::pair<std::string, std::string>> key_pair_list;
+ std::vector<std::string> value_list;
+
+ for (int i = 0; i < test_data_count; ++i) {
+ std::string hash_key = "hash_key_prefix_" + std::to_string(i);
+ std::string sort_key = "sort_key_prefix_" + std::to_string(i);
+ std::string value = "value_" + std::to_string(i);
+
+ apps::update_request one_request;
+ one_request.__isset.key = true;
+ pegasus_generate_key(one_request.key, hash_key, sort_key);
+ one_request.__isset.value = true;
+ one_request.value.assign(value.c_str(), 0, value.size());
+ auto put_result = rrdb_client->put_sync(
+ one_request, std::chrono::milliseconds(test_timeout_milliseconds), test_partition_hash);
+ ASSERT_EQ(ERR_OK, put_result.first);
+ ASSERT_EQ(rocksdb::Status::kOk, put_result.second.error);
+
+ apps::full_key one_full_key;
+ one_full_key.__isset.hash_key = true;
+ one_full_key.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
+ one_full_key.__isset.sort_key = true;
+ one_full_key.sort_key.assign(sort_key.c_str(), 0, sort_key.size());
+ batch_request.keys.emplace_back(std::move(one_full_key));
+
+ key_pair_list.emplace_back(std::move(hash_key), std::move(sort_key));
+ value_list.push_back(std::move(value));
+ }
+
+ int test_no_exist_data_count = 6;
+ for (int i = 0; i < test_no_exist_data_count; ++i) {
+ std::string hash_key = "hash_key_prefix_no_exist_" + std::to_string(i);
+ std::string sort_key = "sort_key_prefix_no_exist_" + std::to_string(i);
+
+ apps::full_key one_full_key;
+ one_full_key.__isset.hash_key = true;
+ one_full_key.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
+ one_full_key.__isset.sort_key = true;
+ one_full_key.sort_key.assign(sort_key.c_str(), 0, sort_key.size());
+ batch_request.keys.emplace_back(std::move(one_full_key));
+ }
+
+ auto batch_get_result = rrdb_client->batch_get_sync(
+ batch_request, std::chrono::milliseconds(test_timeout_milliseconds), test_partition_hash);
+ ASSERT_EQ(ERR_OK, batch_get_result.first);
+ auto &response = batch_get_result.second;
+ ASSERT_EQ(rocksdb::Status::kOk, response.error);
+ ASSERT_EQ(test_data_count, response.data.size());
+ for (int i = 0; i < test_data_count; ++i) {
+ ASSERT_EQ(response.data[i].hash_key.to_string(), key_pair_list[i].first);
+ ASSERT_EQ(response.data[i].sort_key.to_string(), key_pair_list[i].second);
+ ASSERT_EQ(response.data[i].value.to_string(), value_list[i]);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org