You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2021/05/26 07:32:06 UTC

[incubator-pegasus] branch master updated: refactor: implement value schema 1 (#735)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c505b2  refactor: implement value schema 1 (#735)
2c505b2 is described below

commit 2c505b2228ad275284e6bc05090f74947d8b3cb9
Author: zhao liwei <zl...@163.com>
AuthorDate: Wed May 26 15:31:59 2021 +0800

    refactor: implement value schema 1 (#735)
---
 rdsn                                               |  2 +-
 src/base/pegasus_value_schema.h                    |  3 +-
 src/base/test/value_manager_test.cpp               |  4 +-
 src/base/test/value_schema_test.cpp                |  7 +++-
 src/base/value_schema_manager.cpp                  |  4 +-
 src/base/value_schema_v0.cpp                       |  6 +--
 src/base/value_schema_v0.h                         |  4 +-
 .../{value_schema_v0.cpp => value_schema_v1.cpp}   | 45 ++++++++++++++--------
 src/base/{value_schema_v0.h => value_schema_v1.h}  | 14 +++----
 9 files changed, 55 insertions(+), 34 deletions(-)

diff --git a/rdsn b/rdsn
index b4b9a4e..cd0a237 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit b4b9a4ecc499289fe651b2bd1f3ab114baa88604
+Subproject commit cd0a237b20a2fb120ec9b32d4b133e04518def8a
diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 3bd2f9f..f278e46 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -239,8 +239,9 @@ private:
 enum data_version
 {
     VERSION_0 = 0,
+    VERSION_1 = 1,
     VERSION_COUNT,
-    VERSION_MAX = VERSION_0,
+    VERSION_MAX = VERSION_1,
     /// TBD(zlw)
 };
 
diff --git a/src/base/test/value_manager_test.cpp b/src/base/test/value_manager_test.cpp
index 30ed3c1..570b20b 100644
--- a/src/base/test/value_manager_test.cpp
+++ b/src/base/test/value_manager_test.cpp
@@ -40,7 +40,9 @@ TEST(value_schema_manager, get_value_schema)
         uint32_t version;
         bool schema_exist;
     } tests[] = {
-        {pegasus::data_version::VERSION_0, true}, {pegasus::data_version::VERSION_MAX + 1, false},
+        {pegasus::data_version::VERSION_0, true},
+        {pegasus::data_version::VERSION_1, true},
+        {pegasus::data_version::VERSION_MAX + 1, false},
     };
 
     for (const auto &t : tests) {
diff --git a/src/base/test/value_schema_test.cpp b/src/base/test/value_schema_test.cpp
index aa1a40e..2facef4 100644
--- a/src/base/test/value_schema_test.cpp
+++ b/src/base/test/value_schema_test.cpp
@@ -72,6 +72,11 @@ TEST(value_schema, generate_and_extract)
         {0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
         {0, std::numeric_limits<uint32_t>::max(), 0, ""},
         {0, 0, 0, "a"},
+
+        {1, 1000, 10001, ""},
+        {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), "pegasus"},
+        {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), ""},
+        {1, 0, 0, "a"},
     };
 
     for (const auto &t : tests) {
@@ -96,7 +101,7 @@ TEST(value_schema, update_expire_ts)
         uint32_t expire_ts;
         uint32_t update_expire_ts;
     } tests[] = {
-        {0, 1000, 10086},
+        {0, 1000, 10086}, {1, 1000, 10086},
     };
 
     for (const auto &t : tests) {
diff --git a/src/base/value_schema_manager.cpp b/src/base/value_schema_manager.cpp
index 4caaffe..ca381ef 100644
--- a/src/base/value_schema_manager.cpp
+++ b/src/base/value_schema_manager.cpp
@@ -19,6 +19,7 @@
 
 #include "value_schema_manager.h"
 #include "value_schema_v0.h"
+#include "value_schema_v1.h"
 
 namespace pegasus {
 value_schema_manager::value_schema_manager()
@@ -27,7 +28,8 @@ value_schema_manager::value_schema_manager()
      * If someone wants to add a new data version, he only need to implement the new value schema,
      * and register it here.
      */
-    value_schema_manager::instance().register_schema(dsn::make_unique<value_schema_v0>());
+    register_schema(dsn::make_unique<value_schema_v0>());
+    register_schema(dsn::make_unique<value_schema_v1>());
 }
 
 void value_schema_manager::register_schema(std::unique_ptr<value_schema> schema)
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v0.cpp
index 220c3b6..850c07c 100644
--- a/src/base/value_schema_v0.cpp
+++ b/src/base/value_schema_v0.cpp
@@ -40,8 +40,7 @@ std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view val
 dsn::blob value_schema_v0::extract_user_data(std::string &&value)
 {
     auto ret = dsn::blob::create_from_bytes(std::move(value));
-    ret.range(sizeof(uint32_t));
-    return ret;
+    return ret.range(sizeof(uint32_t));
 }
 
 void value_schema_v0::update_field(std::string &value, std::unique_ptr<value_field> field)
@@ -90,8 +89,7 @@ void value_schema_v0::update_expire_ts(std::string &value, std::unique_ptr<value
     dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
     auto expire_field = static_cast<expire_timestamp_field *>(field.get());
 
-    auto new_expire_ts = expire_field->expire_ts;
-    new_expire_ts = dsn::endian::hton(new_expire_ts);
+    auto new_expire_ts = dsn::endian::hton(expire_field->expire_ts);
     memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
 }
 
diff --git a/src/base/value_schema_v0.h b/src/base/value_schema_v0.h
index 5d7712b..4fbde67 100644
--- a/src/base/value_schema_v0.h
+++ b/src/base/value_schema_v0.h
@@ -21,8 +21,6 @@
 
 #include "pegasus_value_schema.h"
 
-#include <dsn/utility/singleton.h>
-
 namespace pegasus {
 /**
  *  rocksdb value: |- expire_ts(4bytes) -|- user value(bytes) -|
@@ -37,7 +35,7 @@ public:
     dsn::blob extract_user_data(std::string &&value) override;
     void update_field(std::string &value, std::unique_ptr<value_field> field) override;
     rocksdb::SliceParts generate_value(const value_params &params) override;
-    data_version version() const override { return VERSION_0; }
+    data_version version() const override { return data_version::VERSION_0; }
 
 private:
     std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v1.cpp
similarity index 67%
copy from src/base/value_schema_v0.cpp
copy to src/base/value_schema_v1.cpp
index 220c3b6..6dba1d4 100644
--- a/src/base/value_schema_v0.cpp
+++ b/src/base/value_schema_v1.cpp
@@ -17,13 +17,15 @@
  * under the License.
  */
 
-#include "value_schema_v0.h"
+#include "value_schema_v1.h"
 
+#include <dsn/utility/endians.h>
 #include <dsn/dist/fmt_logging.h>
+#include <dsn/c/api_utilities.h>
 #include <dsn/utility/smart_pointers.h>
 
 namespace pegasus {
-std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view value,
+std::unique_ptr<value_field> value_schema_v1::extract_field(dsn::string_view value,
                                                             value_field_type type)
 {
     std::unique_ptr<value_field> field = nullptr;
@@ -31,20 +33,22 @@ std::unique_ptr<value_field> value_schema_v0::extract_field(dsn::string_view val
     case value_field_type::EXPIRE_TIMESTAMP:
         field = extract_timestamp(value);
         break;
+    case value_field_type::TIME_TAG:
+        field = extract_time_tag(value);
+        break;
     default:
         dassert_f(false, "Unsupported field type: {}", type);
     }
     return field;
 }
 
-dsn::blob value_schema_v0::extract_user_data(std::string &&value)
+dsn::blob value_schema_v1::extract_user_data(std::string &&value)
 {
     auto ret = dsn::blob::create_from_bytes(std::move(value));
-    ret.range(sizeof(uint32_t));
-    return ret;
+    return ret.range(sizeof(uint32_t) + sizeof(uint64_t));
 }
 
-void value_schema_v0::update_field(std::string &value, std::unique_ptr<value_field> field)
+void value_schema_v1::update_field(std::string &value, std::unique_ptr<value_field> field)
 {
     auto type = field->type();
     switch (field->type()) {
@@ -56,19 +60,24 @@ void value_schema_v0::update_field(std::string &value, std::unique_ptr<value_fie
     }
 }
 
-rocksdb::SliceParts value_schema_v0::generate_value(const value_params &params)
+rocksdb::SliceParts value_schema_v1::generate_value(const value_params &params)
 {
     auto expire_ts_field = static_cast<expire_timestamp_field *>(
         params.fields[value_field_type::EXPIRE_TIMESTAMP].get());
+    auto timetag_field =
+        static_cast<time_tag_field *>(params.fields[value_field_type::TIME_TAG].get());
     auto data_field =
         static_cast<user_data_field *>(params.fields[value_field_type::USER_DATA].get());
-    if (dsn_unlikely(expire_ts_field == nullptr || data_field == nullptr)) {
-        dassert_f(false, "USER_DATA or EXPIRE_TIMESTAMP is not provided");
+    if (dsn_unlikely(expire_ts_field == nullptr || data_field == nullptr ||
+                     timetag_field == nullptr)) {
+        dassert_f(false, "USER_DATA or EXPIRE_TIMESTAMP or TIME_TAG is not provided");
         return {nullptr, 0};
     }
 
-    params.write_buf.resize(sizeof(uint32_t));
-    dsn::data_output(params.write_buf).write_u32(expire_ts_field->expire_ts);
+    params.write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t));
+    dsn::data_output(params.write_buf)
+        .write_u32(expire_ts_field->expire_ts)
+        .write_u64(timetag_field->time_tag);
     params.write_slices.clear();
     params.write_slices.emplace_back(params.write_buf.data(), params.write_buf.size());
 
@@ -79,19 +88,25 @@ rocksdb::SliceParts value_schema_v0::generate_value(const value_params &params)
     return {&params.write_slices[0], static_cast<int>(params.write_slices.size())};
 }
 
-std::unique_ptr<value_field> value_schema_v0::extract_timestamp(dsn::string_view value)
+std::unique_ptr<value_field> value_schema_v1::extract_timestamp(dsn::string_view value)
 {
     uint32_t expire_ts = dsn::data_input(value).read_u32();
     return dsn::make_unique<expire_timestamp_field>(expire_ts);
 }
 
-void value_schema_v0::update_expire_ts(std::string &value, std::unique_ptr<value_field> field)
+std::unique_ptr<value_field> value_schema_v1::extract_time_tag(dsn::string_view value)
+{
+    dsn::data_input input(value);
+    input.skip(sizeof(uint32_t));
+    return dsn::make_unique<time_tag_field>(input.read_u64());
+}
+
+void value_schema_v1::update_expire_ts(std::string &value, std::unique_ptr<value_field> field)
 {
     dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
     auto expire_field = static_cast<expire_timestamp_field *>(field.get());
 
-    auto new_expire_ts = expire_field->expire_ts;
-    new_expire_ts = dsn::endian::hton(new_expire_ts);
+    auto new_expire_ts = dsn::endian::hton(expire_field->expire_ts);
     memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
 }
 
diff --git a/src/base/value_schema_v0.h b/src/base/value_schema_v1.h
similarity index 81%
copy from src/base/value_schema_v0.h
copy to src/base/value_schema_v1.h
index 5d7712b..05756d9 100644
--- a/src/base/value_schema_v0.h
+++ b/src/base/value_schema_v1.h
@@ -21,26 +21,26 @@
 
 #include "pegasus_value_schema.h"
 
-#include <dsn/utility/singleton.h>
-
 namespace pegasus {
 /**
- *  rocksdb value: |- expire_ts(4bytes) -|- user value(bytes) -|
+ *  rocksdb value: |- expire_ts(4bytes) -|- timetag(8 bytes) -|- user value(bytes) -|
  */
-class value_schema_v0 : public value_schema
+class value_schema_v1 : public value_schema
 {
 public:
-    value_schema_v0() = default;
+    value_schema_v1() = default;
 
     std::unique_ptr<value_field> extract_field(dsn::string_view value,
                                                value_field_type type) override;
     dsn::blob extract_user_data(std::string &&value) override;
     void update_field(std::string &value, std::unique_ptr<value_field> field) override;
     rocksdb::SliceParts generate_value(const value_params &params) override;
-    data_version version() const override { return VERSION_0; }
+    data_version version() const override { return data_version::VERSION_1; }
 
-private:
+protected:
     std::unique_ptr<value_field> extract_timestamp(dsn::string_view value);
+    std::unique_ptr<value_field> extract_time_tag(dsn::string_view value);
     void update_expire_ts(std::string &value, std::unique_ptr<value_field> field);
 };
+
 } // namespace pegasus

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org