You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/06/08 03:16:00 UTC

[incubator-pegasus] branch data-version-2 updated: feat: reimplement all of these interfaces about pegasus value (#743)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch data-version-2
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/data-version-2 by this push:
     new 4146b93  feat: reimplement all of these interfaces about pegasus value (#743)
4146b93 is described below

commit 4146b93c4dd101ab57ffdb84029d67f5adf845f4
Author: zhao liwei <zl...@163.com>
AuthorDate: Tue Jun 8 11:15:34 2021 +0800

    feat: reimplement all of these interfaces about pegasus value (#743)
---
 src/base/pegasus_value_schema.h               |  94 -------------------
 src/base/test/value_manager_test.cpp          | 130 ++++++++++++++++++++++++++
 src/base/value_schema_manager.h               |  70 ++++++++++++++
 src/server/key_ttl_compaction_filter.h        |   1 +
 src/server/pegasus_server_impl.cpp            |   4 +-
 src/server/pegasus_server_impl.h              |   1 +
 src/server/pegasus_server_impl_init.cpp       |   2 +-
 src/server/test/pegasus_value_schema_test.cpp |   1 +
 src/shell/commands/debugger.cpp               |   1 +
 9 files changed, 207 insertions(+), 97 deletions(-)

diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 0202e6a..244eb27 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -36,100 +36,6 @@
 
 namespace pegasus {
 
-constexpr int PEGASUS_DATA_VERSION_MAX = 1u;
-
-/// Generates timetag in host endian.
-/// \see comment on pegasus_value_generator::generate_value_v1
-inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag)
-{
-    return timestamp << 8u | cluster_id << 1u | deleted_tag;
-}
-
-inline uint64_t extract_timestamp_from_timetag(uint64_t timetag)
-{
-    // 56bit: 0xFFFFFFFFFFFFFFL
-    return static_cast<uint64_t>((timetag >> 8u) & 0xFFFFFFFFFFFFFFLu);
-}
-
-/// Extracts expire_ts from rocksdb value with given version.
-/// The value schema must be in v0 or v1.
-/// \return expire_ts in host endian
-inline uint32_t pegasus_extract_expire_ts(uint32_t version, dsn::string_view value)
-{
-    dassert_f(version <= PEGASUS_DATA_VERSION_MAX,
-              "data version({}) must be <= {}",
-              version,
-              PEGASUS_DATA_VERSION_MAX);
-
-    return dsn::data_input(value).read_u32();
-}
-
-/// Extracts user value from a raw rocksdb value.
-/// In order to avoid data copy, the ownership of `raw_value` will be transferred
-/// into `user_data`.
-/// \param user_data: the result.
-inline void
-pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob &user_data)
-{
-    dassert_f(version <= PEGASUS_DATA_VERSION_MAX,
-              "data version({}) must be <= {}",
-              version,
-              PEGASUS_DATA_VERSION_MAX);
-
-    auto *s = new std::string(std::move(raw_value));
-    dsn::data_input input(*s);
-    input.skip(sizeof(uint32_t));
-    if (version == 1) {
-        input.skip(sizeof(uint64_t));
-    }
-    dsn::string_view view = input.read_str();
-
-    // tricky code to avoid memory copy
-    std::shared_ptr<char> buf(const_cast<char *>(view.data()), [s](char *) { delete s; });
-    user_data.assign(std::move(buf), 0, static_cast<unsigned int>(view.length()));
-}
-
-/// Extracts timetag from a v1 value.
-inline uint64_t pegasus_extract_timetag(int version, dsn::string_view value)
-{
-    dassert(version == 1, "data version(%d) must be v1", version);
-
-    dsn::data_input input(value);
-    input.skip(sizeof(uint32_t));
-
-    return input.read_u64();
-}
-
-/// Update expire_ts in rocksdb value with given version.
-/// The value schema must be in v0 or v1.
-inline void pegasus_update_expire_ts(uint32_t version, std::string &value, uint32_t new_expire_ts)
-{
-    if (version == 0 || version == 1) {
-        dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
-
-        new_expire_ts = dsn::endian::hton(new_expire_ts);
-        memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
-    } else {
-        dfatal_f("unsupported value schema version: {}", version);
-        __builtin_unreachable();
-    }
-}
-
-/// \return true if expired
-inline bool check_if_ts_expired(uint32_t epoch_now, uint32_t expire_ts)
-{
-    return expire_ts > 0 && expire_ts <= epoch_now;
-}
-
-/// \return true if expired
-inline bool check_if_record_expired(uint32_t value_schema_version,
-                                    uint32_t epoch_now,
-                                    dsn::string_view raw_value)
-{
-    return check_if_ts_expired(epoch_now,
-                               pegasus_extract_expire_ts(value_schema_version, raw_value));
-}
-
 /// Helper class for generating value.
 /// NOTES:
 /// * the instance of pegasus_value_generator must be alive while the returned SliceParts is.
diff --git a/src/base/test/value_manager_test.cpp b/src/base/test/value_manager_test.cpp
index 2474ffd..59a1c99 100644
--- a/src/base/test/value_manager_test.cpp
+++ b/src/base/test/value_manager_test.cpp
@@ -83,3 +83,133 @@ TEST(pegasus_value_manager, get_value_schema)
         ASSERT_EQ(t.expect_version, schema->version());
     }
 }
+
+TEST(pegasus_value_manager, check_if_ts_expired)
+{
+    struct test_case
+    {
+        uint32_t epoch_now;
+        uint32_t expire_ts;
+        bool res_value;
+    } tests[] = {
+        {0, UINT32_MAX, false},
+        {100, 100, true},
+        {100, 99, true},
+        {100, 101, false},
+    };
+
+    for (const auto &t : tests) {
+        ASSERT_EQ(t.res_value, check_if_ts_expired(t.epoch_now, t.expire_ts));
+    }
+}
+
+TEST(pegasus_value_manager, extract_timestamp_from_timetag)
+{
+    uint64_t deadbeaf = 0xdeadbeaf;
+    uint64_t timetag = deadbeaf << 8u | 0xab;
+    auto res = extract_timestamp_from_timetag(timetag);
+    ASSERT_EQ(res, deadbeaf);
+}
+
+TEST(pegasus_value_manager, generate_timetag)
+{
+    uint64_t timestamp = 10086;
+    uint8_t cluster_id = 1;
+    bool deleted_tag = false;
+    auto res = generate_timetag(timestamp, cluster_id, deleted_tag);
+
+    ASSERT_EQ(res >> 8u, timestamp);
+    ASSERT_EQ((res & 0xFF) >> 1u, cluster_id);
+    ASSERT_EQ(res & 0x01, deleted_tag);
+}
+
+TEST(pegasus_value_manager, pegasus_extract_expire_ts)
+{
+    struct test_case
+    {
+        uint32_t version;
+        uint32_t expire_ts;
+    } tests[] = {
+        {0, 10086},
+        {1, 10086},
+        {2, 10086},
+    };
+
+    for (const auto &test : tests) {
+        // generate data for test
+        auto schema = value_schema_manager::instance().get_value_schema(test.version);
+        auto value = generate_value(schema, test.expire_ts, 0, "user_data");
+
+        auto expect_expire_ts = pegasus_extract_expire_ts(test.version, value);
+        ASSERT_EQ(expect_expire_ts, test.expire_ts);
+    }
+}
+
+TEST(pegasus_value_manager, pegasus_extract_timetag)
+{
+    struct test_case
+    {
+        uint32_t version;
+        uint32_t time_tag;
+    } tests[] = {
+        {2, 10086},
+    };
+
+    for (const auto &test : tests) {
+        // generate data for test
+        auto schema = value_schema_manager::instance().get_value_schema(test.version);
+        auto value = generate_value(schema, 0, test.time_tag, "user_data");
+
+        auto time_tag = pegasus_extract_timetag(test.version, value);
+        ASSERT_EQ(time_tag, test.time_tag);
+    }
+}
+
+TEST(pegasus_value_manager, pegasus_extract_user_data)
+{
+    struct test_case
+    {
+        uint32_t version;
+        std::string user_data;
+    } tests[] = {
+        {0, "user_data"},
+        {1, "user_data"},
+        {2, "user_data"},
+    };
+
+    for (const auto &test : tests) {
+        // generate data for test
+        auto schema = value_schema_manager::instance().get_value_schema(test.version);
+        auto value = generate_value(schema, 0, 0, test.user_data);
+
+        dsn::blob user_data;
+        pegasus_extract_user_data(test.version, std::move(value), user_data);
+        ASSERT_EQ(user_data, test.user_data);
+    }
+}
+
+TEST(pegasus_value_manager, pegasus_update_expire_ts)
+{
+    struct test_case
+    {
+        uint32_t version;
+        uint32_t expire_ts;
+        uint32_t update_expire_ts;
+    } tests[] = {
+        {0, 0, 10086},
+        {1, 0, 10086},
+        {2, 0, 10086},
+    };
+
+    for (const auto &test : tests) {
+        // generate data for test
+        auto schema = value_schema_manager::instance().get_value_schema(test.version);
+        auto value = generate_value(schema, test.expire_ts, 0, "user_data");
+
+        // update expire timestamp
+        pegasus_update_expire_ts(test.version, value, test.update_expire_ts);
+
+        auto expire_ts = pegasus_extract_expire_ts(test.version, value);
+        ASSERT_EQ(expire_ts, test.update_expire_ts);
+    }
+}
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_manager.h
index 189d89e..4582835 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_manager.h
@@ -40,4 +40,74 @@ private:
 
     std::array<std::unique_ptr<value_schema>, data_version::VERSION_COUNT> _schemas;
 };
+
+/// Generates timetag in host endian.
+/// \see comment on pegasus_value_generator::generate_value_v1
+inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag)
+{
+    return timestamp << 8u | cluster_id << 1u | deleted_tag;
+}
+
+inline uint64_t extract_timestamp_from_timetag(uint64_t timetag)
+{
+    // 56bit: 0xFFFFFFFFFFFFFFL
+    return static_cast<uint64_t>((timetag >> 8u) & 0xFFFFFFFFFFFFFFLu);
+}
+
+/// Extracts expire_ts from rocksdb value with given version.
+/// The value schema must be in v0 or v1.
+/// \return expire_ts in host endian
+inline uint32_t pegasus_extract_expire_ts(uint32_t meta_cf_data_version, dsn::string_view value)
+{
+    auto schema = value_schema_manager::instance().get_value_schema(meta_cf_data_version, value);
+    auto field = schema->extract_field(value, value_field_type::EXPIRE_TIMESTAMP).get();
+    return static_cast<expire_timestamp_field *>(field)->expire_ts;
+}
+
+/// Extracts user value from a raw rocksdb value.
+/// In order to avoid data copy, the ownership of `raw_value` will be transferred
+/// into `user_data`.
+/// \param user_data: the result.
+inline void pegasus_extract_user_data(uint32_t meta_cf_data_version,
+                                      std::string &&raw_value,
+                                      ::dsn::blob &user_data)
+{
+    auto schema =
+        value_schema_manager::instance().get_value_schema(meta_cf_data_version, raw_value);
+    user_data = schema->extract_user_data(std::move(raw_value));
+}
+
+/// Extracts timetag from a v1 value.
+inline uint64_t pegasus_extract_timetag(int meta_cf_data_version, dsn::string_view value)
+{
+    auto schema = value_schema_manager::instance().get_value_schema(meta_cf_data_version, value);
+    auto field = schema->extract_field(value, value_field_type::TIME_TAG).get();
+    return static_cast<time_tag_field *>(field)->time_tag;
+}
+
+/// Update expire_ts in rocksdb value with given version.
+/// The value schema must be in v0 or v1.
+inline void
+pegasus_update_expire_ts(uint32_t meta_cf_data_version, std::string &value, uint32_t new_expire_ts)
+{
+    auto schema = value_schema_manager::instance().get_value_schema(meta_cf_data_version, value);
+    auto expire_ts_field = dsn::make_unique<expire_timestamp_field>(new_expire_ts);
+    schema->update_field(value, std::move(expire_ts_field));
+}
+
+/// \return true if expired
+inline bool check_if_ts_expired(uint32_t epoch_now, uint32_t expire_ts)
+{
+    return expire_ts > 0 && expire_ts <= epoch_now;
+}
+
+/// \return true if expired
+inline bool check_if_record_expired(uint32_t value_schema_version,
+                                    uint32_t epoch_now,
+                                    dsn::string_view raw_value)
+{
+    return check_if_ts_expired(epoch_now,
+                               pegasus_extract_expire_ts(value_schema_version, raw_value));
+}
+
 } // namespace pegasus
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index 4f26844..1992e5a 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -27,6 +27,7 @@
 #include "base/pegasus_utils.h"
 #include "base/pegasus_key_schema.h"
 #include "base/pegasus_value_schema.h"
+#include "base/value_schema_manager.h"
 
 namespace pegasus {
 namespace server {
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 890d7bb..cbdf3cd 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1470,7 +1470,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         _usage_scenario = _meta_store->get_usage_scenario();
         uint64_t last_manual_compact_finish_time =
             _meta_store->get_last_manual_compact_finish_time();
-        if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
+        if (_pegasus_data_version > data_version::VERSION_MAX) {
             derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
             release_db();
             return ::dsn::ERR_LOCAL_APP_FAILURE;
@@ -1480,7 +1480,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         _manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
     } else {
         // Write initial meta data to meta CF and flush when create new DB.
-        _meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
+        _meta_store->set_data_version(data_version::VERSION_MAX);
         _meta_store->set_last_flushed_decree(0);
         _meta_store->set_last_manual_compact_finish_time(0);
         flush_all_family_columns(true);
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 4a26577..d73567e 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -36,6 +36,7 @@
 #include "pegasus_write_service.h"
 #include "range_read_limiter.h"
 #include "pegasus_read_service.h"
+#include "base/value_schema_manager.h"
 
 namespace pegasus {
 namespace server {
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 414e2c4..1800827 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -59,7 +59,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
       _data_cf(nullptr),
       _meta_cf(nullptr),
       _is_open(false),
-      _pegasus_data_version(PEGASUS_DATA_VERSION_MAX),
+      _pegasus_data_version(data_version::VERSION_MAX),
       _last_durable_decree(0),
       _is_checkpointing(false),
       _manual_compact_svc(this),
diff --git a/src/server/test/pegasus_value_schema_test.cpp b/src/server/test/pegasus_value_schema_test.cpp
index 77c8e46..08947e1 100644
--- a/src/server/test/pegasus_value_schema_test.cpp
+++ b/src/server/test/pegasus_value_schema_test.cpp
@@ -18,6 +18,7 @@
  */
 
 #include "base/pegasus_value_schema.h"
+#include "base/value_schema_manager.h"
 
 #include <gtest/gtest.h>
 
diff --git a/src/shell/commands/debugger.cpp b/src/shell/commands/debugger.cpp
index e3d8407..ed59532 100644
--- a/src/shell/commands/debugger.cpp
+++ b/src/shell/commands/debugger.cpp
@@ -19,6 +19,7 @@
 
 #include "shell/commands.h"
 #include "base/idl_utils.h"
+#include "base/value_schema_manager.h"
 #include <rocksdb/sst_dump_tool.h>
 #include <rocksdb/utilities/ldb_cmd.h>
 #include <fmt/time.h>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org