You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/05/26 07:32:06 UTC
[incubator-pegasus] branch master updated: refactor: implement
value schema 1 (#735)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 2c505b2 refactor: implement value schema 1 (#735)
2c505b2 is described below
commit 2c505b2228ad275284e6bc05090f74947d8b3cb9
Author: zhao liwei <zl...@163.com>
AuthorDate: Wed May 26 15:31:59 2021 +0800
refactor: implement value schema 1 (#735)
---
rdsn | 2 +-
src/base/pegasus_value_schema.h | 3 +-
src/base/test/value_manager_test.cpp | 4 +-
src/base/test/value_schema_test.cpp | 7 +++-
src/base/value_schema_manager.cpp | 4 +-
src/base/value_schema_v0.cpp | 6 +--
src/base/value_schema_v0.h | 4 +-
.../{value_schema_v0.cpp => value_schema_v1.cpp} | 45 ++++++++++++++--------
src/base/{value_schema_v0.h => value_schema_v1.h} | 14 +++----
9 files changed, 55 insertions(+), 34 deletions(-)
diff --git a/rdsn b/rdsn
index b4b9a4e..cd0a237 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit b4b9a4ecc499289fe651b2bd1f3ab114baa88604
+Subproject commit cd0a237b20a2fb120ec9b32d4b133e04518def8a
diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 3bd2f9f..f278e46 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -239,8 +239,9 @@ private:
enum data_version
{
VERSION_0 = 0,
+ VERSION_1 = 1,
VERSION_COUNT,
- VERSION_MAX = VERSION_0,
+ VERSION_MAX = VERSION_1,
/// TBD(zlw)
};
diff --git a/src/base/test/value_manager_test.cpp b/src/base/test/value_manager_test.cpp
index 30ed3c1..570b20b 100644
--- a/src/base/test/value_manager_test.cpp
+++ b/src/base/test/value_manager_test.cpp
@@ -40,7 +40,9 @@ TEST(value_schema_manager, get_value_schema)
uint32_t version;
bool schema_exist;
} tests[] = {
- {pegasus::data_version::VERSION_0, true}, {pegasus::data_version::VERSION_MAX + 1, false},
+ {pegasus::data_version::VERSION_0, true},
+ {pegasus::data_version::VERSION_1, true},
+ {pegasus::data_version::VERSION_MAX + 1, false},
};
for (const auto &t : tests) {
diff --git a/src/base/test/value_schema_test.cpp b/src/base/test/value_schema_test.cpp
index aa1a40e..2facef4 100644
--- a/src/base/test/value_schema_test.cpp
+++ b/src/base/test/value_schema_test.cpp
@@ -72,6 +72,11 @@ TEST(value_schema, generate_and_extract)
{0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
{0, std::numeric_limits<uint32_t>::max(), 0, ""},
{0, 0, 0, "a"},
+
+ {1, 1000, 10001, ""},
+ {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), "pegasus"},
+ {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), ""},
+ {1, 0, 0, "a"},
};
for (const auto &t : tests) {
@@ -96,7 +101,7 @@ TEST(value_schema, update_expire_ts)
uint32_t expire_ts;
uint32_t update_expire_ts;
} tests[] = {
- {0, 1000, 10086},
+ {0, 1000, 10086}, {1, 1000, 10086},
};
for (const auto &t : tests) {
diff --git a/src/base/value_schema_manager.cpp b/src/base/value_schema_manager.cpp
index 4caaffe..ca381ef 100644
--- a/src/base/value_schema_manager.cpp
+++ b/src/base/value_schema_manager.cpp
@@ -19,6 +19,7 @@
#include "value_schema_manager.h"
#include "value_schema_v0.h"
+#include "value_schema_v1.h"
namespace pegasus {
value_schema_manager::value_schema_manager()
@@ -27,7 +28,8 @@ value_schema_manager::value_schema_manager()
* If someone wants to add a new data version, he only need to implement the new value schema,
* and register it here.
*/
- value_schema_manager::instance().register_schema(dsn::make_unique<value_schema_v0>());
+ register_schema(dsn::make_unique<value_schema_v0>());
+ register_schema(dsn::make_unique<value_schema_v1>());
}
void value_schema_manager::register_schema(std::unique_ptr<value_schema> schema)
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v0.cpp
index 220c3b6..850c07c 100644
--- a/src/base/value_schema_v0.cpp
+++ b/src/base/value_schema_v0.cpp
@@ -40,8 +40,7 @@ std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view val
dsn::blob value_schema_v0::extract_user_data(std::string &&value)
{
auto ret = dsn::blob::create_from_bytes(std::move(value));
- ret.range(sizeof(uint32_t));
- return ret;
+ return ret.range(sizeof(uint32_t));
}
void value_schema_v0::update_field(std::string &value, std::unique_ptr<value_field> field)
@@ -90,8 +89,7 @@ void value_schema_v0::update_expire_ts(std::string &value, std::unique_ptr<value
dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
auto expire_field = static_cast<expire_timestamp_field *>(field.get());
- auto new_expire_ts = expire_field->expire_ts;
- new_expire_ts = dsn::endian::hton(new_expire_ts);
+ auto new_expire_ts = dsn::endian::hton(expire_field->expire_ts);
memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
}
diff --git a/src/base/value_schema_v0.h b/src/base/value_schema_v0.h
index 5d7712b..4fbde67 100644
--- a/src/base/value_schema_v0.h
+++ b/src/base/value_schema_v0.h
@@ -21,8 +21,6 @@
#include "pegasus_value_schema.h"
-#include <dsn/utility/singleton.h>
-
namespace pegasus {
/**
* rocksdb value: |- expire_ts(4bytes) -|- user value(bytes) -|
@@ -37,7 +35,7 @@ public:
dsn::blob extract_user_data(std::string &&value) override;
void update_field(std::string &value, std::unique_ptr<value_field> field) override;
rocksdb::SliceParts generate_value(const value_params ¶ms) override;
- data_version version() const override { return VERSION_0; }
+ data_version version() const override { return data_version::VERSION_0; }
private:
std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v1.cpp
similarity index 67%
copy from src/base/value_schema_v0.cpp
copy to src/base/value_schema_v1.cpp
index 220c3b6..6dba1d4 100644
--- a/src/base/value_schema_v0.cpp
+++ b/src/base/value_schema_v1.cpp
@@ -17,13 +17,15 @@
* under the License.
*/
-#include "value_schema_v0.h"
+#include "value_schema_v1.h"
+#include <dsn/utility/endians.h>
#include <dsn/dist/fmt_logging.h>
+#include <dsn/c/api_utilities.h>
#include <dsn/utility/smart_pointers.h>
namespace pegasus {
-std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view value,
+std::unique_ptr<value_field> value_schema_v1::extract_field(dsn::string_view value,
value_field_type type)
{
std::unique_ptr<value_field> field = nullptr;
@@ -31,20 +33,22 @@ std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view val
case value_field_type::EXPIRE_TIMESTAMP:
field = extract_timestamp(value);
break;
+ case value_field_type::TIME_TAG:
+ field = extract_time_tag(value);
+ break;
default:
dassert_f(false, "Unsupported field type: {}", type);
}
return field;
}
-dsn::blob value_schema_v0::extract_user_data(std::string &&value)
+dsn::blob value_schema_v1::extract_user_data(std::string &&value)
{
auto ret = dsn::blob::create_from_bytes(std::move(value));
- ret.range(sizeof(uint32_t));
- return ret;
+ return ret.range(sizeof(uint32_t) + sizeof(uint64_t));
}
-void value_schema_v0::update_field(std::string &value, std::unique_ptr<value_field> field)
+void value_schema_v1::update_field(std::string &value, std::unique_ptr<value_field> field)
{
auto type = field->type();
switch (field->type()) {
@@ -56,19 +60,24 @@ void value_schema_v0::update_field(std::string &value, std::unique_ptr<value_fie
}
}
-rocksdb::SliceParts value_schema_v0::generate_value(const value_params ¶ms)
+rocksdb::SliceParts value_schema_v1::generate_value(const value_params ¶ms)
{
auto expire_ts_field = static_cast<expire_timestamp_field *>(
params.fields[value_field_type::EXPIRE_TIMESTAMP].get());
+ auto timetag_field =
+ static_cast<time_tag_field *>(params.fields[value_field_type::TIME_TAG].get());
auto data_field =
static_cast<user_data_field *>(params.fields[value_field_type::USER_DATA].get());
- if (dsn_unlikely(expire_ts_field == nullptr || data_field == nullptr)) {
- dassert_f(false, "USER_DATA or EXPIRE_TIMESTAMP is not provided");
+ if (dsn_unlikely(expire_ts_field == nullptr || data_field == nullptr ||
+ timetag_field == nullptr)) {
+ dassert_f(false, "USER_DATA or EXPIRE_TIMESTAMP or TIME_TAG is not provided");
return {nullptr, 0};
}
- params.write_buf.resize(sizeof(uint32_t));
- dsn::data_output(params.write_buf).write_u32(expire_ts_field->expire_ts);
+ params.write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t));
+ dsn::data_output(params.write_buf)
+ .write_u32(expire_ts_field->expire_ts)
+ .write_u64(timetag_field->time_tag);
params.write_slices.clear();
params.write_slices.emplace_back(params.write_buf.data(), params.write_buf.size());
@@ -79,19 +88,25 @@ rocksdb::SliceParts value_schema_v0::generate_value(const value_params ¶ms)
return {¶ms.write_slices[0], static_cast<int>(params.write_slices.size())};
}
-std::unique_ptr<value_field> value_schema_v0::extract_timestamp(dsn::string_view value)
+std::unique_ptr<value_field> value_schema_v1::extract_timestamp(dsn::string_view value)
{
uint32_t expire_ts = dsn::data_input(value).read_u32();
return dsn::make_unique<expire_timestamp_field>(expire_ts);
}
-void value_schema_v0::update_expire_ts(std::string &value, std::unique_ptr<value_field> field)
+std::unique_ptr<value_field> value_schema_v1::extract_time_tag(dsn::string_view value)
+{
+ dsn::data_input input(value);
+ input.skip(sizeof(uint32_t));
+ return dsn::make_unique<time_tag_field>(input.read_u64());
+}
+
+void value_schema_v1::update_expire_ts(std::string &value, std::unique_ptr<value_field> field)
{
dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
auto expire_field = static_cast<expire_timestamp_field *>(field.get());
- auto new_expire_ts = expire_field->expire_ts;
- new_expire_ts = dsn::endian::hton(new_expire_ts);
+ auto new_expire_ts = dsn::endian::hton(expire_field->expire_ts);
memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
}
diff --git a/src/base/value_schema_v0.h b/src/base/value_schema_v1.h
similarity index 81%
copy from src/base/value_schema_v0.h
copy to src/base/value_schema_v1.h
index 5d7712b..05756d9 100644
--- a/src/base/value_schema_v0.h
+++ b/src/base/value_schema_v1.h
@@ -21,26 +21,26 @@
#include "pegasus_value_schema.h"
-#include <dsn/utility/singleton.h>
-
namespace pegasus {
/**
- * rocksdb value: |- expire_ts(4bytes) -|- user value(bytes) -|
+ * rocksdb value: |- expire_ts(4bytes) -|- timetag(8 bytes) -|- user value(bytes) -|
*/
-class value_schema_v0 : public value_schema
+class value_schema_v1 : public value_schema
{
public:
- value_schema_v0() = default;
+ value_schema_v1() = default;
std::unique_ptr<value_field> extract_field(dsn::string_view value,
value_field_type type) override;
dsn::blob extract_user_data(std::string &&value) override;
void update_field(std::string &value, std::unique_ptr<value_field> field) override;
rocksdb::SliceParts generate_value(const value_params ¶ms) override;
- data_version version() const override { return VERSION_0; }
+ data_version version() const override { return data_version::VERSION_1; }
-private:
+protected:
std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
+ std::unique_ptr<value_field> extract_time_tag(dsn::string_view value);
void update_expire_ts(std::string &value, std::unique_ptr<value_field> field);
};
+
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org