You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/06/08 03:16:00 UTC
[incubator-pegasus] branch data-version-2 updated: feat:
reimplement all of these interfaces about pegasus value (#743)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch data-version-2
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/data-version-2 by this push:
new 4146b93 feat: reimplement all of these interfaces about pegasus value (#743)
4146b93 is described below
commit 4146b93c4dd101ab57ffdb84029d67f5adf845f4
Author: zhao liwei <zl...@163.com>
AuthorDate: Tue Jun 8 11:15:34 2021 +0800
feat: reimplement all of these interfaces about pegasus value (#743)
---
src/base/pegasus_value_schema.h | 94 -------------------
src/base/test/value_manager_test.cpp | 130 ++++++++++++++++++++++++++
src/base/value_schema_manager.h | 70 ++++++++++++++
src/server/key_ttl_compaction_filter.h | 1 +
src/server/pegasus_server_impl.cpp | 4 +-
src/server/pegasus_server_impl.h | 1 +
src/server/pegasus_server_impl_init.cpp | 2 +-
src/server/test/pegasus_value_schema_test.cpp | 1 +
src/shell/commands/debugger.cpp | 1 +
9 files changed, 207 insertions(+), 97 deletions(-)
diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 0202e6a..244eb27 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -36,100 +36,6 @@
namespace pegasus {
-constexpr int PEGASUS_DATA_VERSION_MAX = 1u;
-
-/// Generates timetag in host endian.
-/// \see comment on pegasus_value_generator::generate_value_v1
-inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag)
-{
- return timestamp << 8u | cluster_id << 1u | deleted_tag;
-}
-
-inline uint64_t extract_timestamp_from_timetag(uint64_t timetag)
-{
- // 56bit: 0xFFFFFFFFFFFFFFL
- return static_cast<uint64_t>((timetag >> 8u) & 0xFFFFFFFFFFFFFFLu);
-}
-
-/// Extracts expire_ts from rocksdb value with given version.
-/// The value schema must be in v0 or v1.
-/// \return expire_ts in host endian
-inline uint32_t pegasus_extract_expire_ts(uint32_t version, dsn::string_view value)
-{
- dassert_f(version <= PEGASUS_DATA_VERSION_MAX,
- "data version({}) must be <= {}",
- version,
- PEGASUS_DATA_VERSION_MAX);
-
- return dsn::data_input(value).read_u32();
-}
-
-/// Extracts user value from a raw rocksdb value.
-/// In order to avoid data copy, the ownership of `raw_value` will be transferred
-/// into `user_data`.
-/// \param user_data: the result.
-inline void
-pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob &user_data)
-{
- dassert_f(version <= PEGASUS_DATA_VERSION_MAX,
- "data version({}) must be <= {}",
- version,
- PEGASUS_DATA_VERSION_MAX);
-
- auto *s = new std::string(std::move(raw_value));
- dsn::data_input input(*s);
- input.skip(sizeof(uint32_t));
- if (version == 1) {
- input.skip(sizeof(uint64_t));
- }
- dsn::string_view view = input.read_str();
-
- // tricky code to avoid memory copy
- std::shared_ptr<char> buf(const_cast<char *>(view.data()), [s](char *) { delete s; });
- user_data.assign(std::move(buf), 0, static_cast<unsigned int>(view.length()));
-}
-
-/// Extracts timetag from a v1 value.
-inline uint64_t pegasus_extract_timetag(int version, dsn::string_view value)
-{
- dassert(version == 1, "data version(%d) must be v1", version);
-
- dsn::data_input input(value);
- input.skip(sizeof(uint32_t));
-
- return input.read_u64();
-}
-
-/// Update expire_ts in rocksdb value with given version.
-/// The value schema must be in v0 or v1.
-inline void pegasus_update_expire_ts(uint32_t version, std::string &value, uint32_t new_expire_ts)
-{
- if (version == 0 || version == 1) {
- dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");
-
- new_expire_ts = dsn::endian::hton(new_expire_ts);
- memcpy(const_cast<char *>(value.data()), &new_expire_ts, sizeof(uint32_t));
- } else {
- dfatal_f("unsupported value schema version: {}", version);
- __builtin_unreachable();
- }
-}
-
-/// \return true if expired
-inline bool check_if_ts_expired(uint32_t epoch_now, uint32_t expire_ts)
-{
- return expire_ts > 0 && expire_ts <= epoch_now;
-}
-
-/// \return true if expired
-inline bool check_if_record_expired(uint32_t value_schema_version,
- uint32_t epoch_now,
- dsn::string_view raw_value)
-{
- return check_if_ts_expired(epoch_now,
- pegasus_extract_expire_ts(value_schema_version, raw_value));
-}
-
/// Helper class for generating value.
/// NOTES:
/// * the instance of pegasus_value_generator must be alive while the returned SliceParts is.
diff --git a/src/base/test/value_manager_test.cpp b/src/base/test/value_manager_test.cpp
index 2474ffd..59a1c99 100644
--- a/src/base/test/value_manager_test.cpp
+++ b/src/base/test/value_manager_test.cpp
@@ -83,3 +83,133 @@ TEST(pegasus_value_manager, get_value_schema)
ASSERT_EQ(t.expect_version, schema->version());
}
}
+
+TEST(pegasus_value_manager, check_if_ts_expired)
+{
+ struct test_case
+ {
+ uint32_t epoch_now;
+ uint32_t expire_ts;
+ bool res_value;
+ } tests[] = {
+ {0, UINT32_MAX, false},
+ {100, 100, true},
+ {100, 99, true},
+ {100, 101, false},
+ };
+
+ for (const auto &t : tests) {
+ ASSERT_EQ(t.res_value, check_if_ts_expired(t.epoch_now, t.expire_ts));
+ }
+}
+
+TEST(pegasus_value_manager, extract_timestamp_from_timetag)
+{
+ uint64_t deadbeaf = 0xdeadbeaf;
+ uint64_t timetag = deadbeaf << 8u | 0xab;
+ auto res = extract_timestamp_from_timetag(timetag);
+ ASSERT_EQ(res, deadbeaf);
+}
+
+TEST(pegasus_value_manager, generate_timetag)
+{
+ uint64_t timestamp = 10086;
+ uint8_t cluster_id = 1;
+ bool deleted_tag = false;
+ auto res = generate_timetag(timestamp, cluster_id, deleted_tag);
+
+ ASSERT_EQ(res >> 8u, timestamp);
+ ASSERT_EQ((res & 0xFF) >> 1u, cluster_id);
+ ASSERT_EQ(res & 0x01, deleted_tag);
+}
+
+TEST(pegasus_value_manager, pegasus_extract_expire_ts)
+{
+ struct test_case
+ {
+ uint32_t version;
+ uint32_t expire_ts;
+ } tests[] = {
+ {0, 10086},
+ {1, 10086},
+ {2, 10086},
+ };
+
+ for (const auto &test : tests) {
+ // generate data for test
+ auto schema = value_schema_manager::instance().get_value_schema(test.version);
+ auto value = generate_value(schema, test.expire_ts, 0, "user_data");
+
+ auto expect_expire_ts = pegasus_extract_expire_ts(test.version, value);
+ ASSERT_EQ(expect_expire_ts, test.expire_ts);
+ }
+}
+
+TEST(pegasus_value_manager, pegasus_extract_timetag)
+{
+ struct test_case
+ {
+ uint32_t version;
+ uint32_t time_tag;
+ } tests[] = {
+ {2, 10086},
+ };
+
+ for (const auto &test : tests) {
+ // generate data for test
+ auto schema = value_schema_manager::instance().get_value_schema(test.version);
+ auto value = generate_value(schema, 0, test.time_tag, "user_data");
+
+ auto time_tag = pegasus_extract_timetag(test.version, value);
+ ASSERT_EQ(time_tag, test.time_tag);
+ }
+}
+
+TEST(pegasus_value_manager, pegasus_extract_user_data)
+{
+ struct test_case
+ {
+ uint32_t version;
+ std::string user_data;
+ } tests[] = {
+ {0, "user_data"},
+ {1, "user_data"},
+ {2, "user_data"},
+ };
+
+ for (const auto &test : tests) {
+ // generate data for test
+ auto schema = value_schema_manager::instance().get_value_schema(test.version);
+ auto value = generate_value(schema, 0, 0, test.user_data);
+
+ dsn::blob user_data;
+ pegasus_extract_user_data(test.version, std::move(value), user_data);
+ ASSERT_EQ(user_data, test.user_data);
+ }
+}
+
+TEST(pegasus_value_manager, pegasus_update_expire_ts)
+{
+ struct test_case
+ {
+ uint32_t version;
+ uint32_t expire_ts;
+ uint32_t update_expire_ts;
+ } tests[] = {
+ {0, 0, 10086},
+ {1, 0, 10086},
+ {2, 0, 10086},
+ };
+
+ for (const auto &test : tests) {
+ // generate data for test
+ auto schema = value_schema_manager::instance().get_value_schema(test.version);
+ auto value = generate_value(schema, test.expire_ts, 0, "user_data");
+
+ // update expire timestamp
+ pegasus_update_expire_ts(test.version, value, test.update_expire_ts);
+
+ auto expire_ts = pegasus_extract_expire_ts(test.version, value);
+ ASSERT_EQ(expire_ts, test.update_expire_ts);
+ }
+}
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_manager.h
index 189d89e..4582835 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_manager.h
@@ -40,4 +40,74 @@ private:
std::array<std::unique_ptr<value_schema>, data_version::VERSION_COUNT> _schemas;
};
+
+/// Generates timetag in host endian.
+/// \see comment on pegasus_value_generator::generate_value_v1
+inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag)
+{
+ return timestamp << 8u | cluster_id << 1u | deleted_tag;
+}
+
+inline uint64_t extract_timestamp_from_timetag(uint64_t timetag)
+{
+ // 56bit: 0xFFFFFFFFFFFFFFL
+ return static_cast<uint64_t>((timetag >> 8u) & 0xFFFFFFFFFFFFFFLu);
+}
+
+/// Extracts expire_ts from rocksdb value with given version.
+/// The value schema must be in v0 or v1.
+/// \return expire_ts in host endian
+inline uint32_t pegasus_extract_expire_ts(uint32_t meta_cf_data_version, dsn::string_view value)
+{
+ auto schema = value_schema_manager::instance().get_value_schema(meta_cf_data_version, value);
+ auto field = schema->extract_field(value, value_field_type::EXPIRE_TIMESTAMP).get();
+ return static_cast<expire_timestamp_field *>(field)->expire_ts;
+}
+
+/// Extracts user value from a raw rocksdb value.
+/// In order to avoid data copy, the ownership of `raw_value` will be transferred
+/// into `user_data`.
+/// \param user_data: the result.
+inline void pegasus_extract_user_data(uint32_t meta_cf_data_version,
+ std::string &&raw_value,
+ ::dsn::blob &user_data)
+{
+ auto schema =
+ value_schema_manager::instance().get_value_schema(meta_cf_data_version, raw_value);
+ user_data = schema->extract_user_data(std::move(raw_value));
+}
+
+/// Extracts timetag from a v1 value.
+inline uint64_t pegasus_extract_timetag(int meta_cf_data_version, dsn::string_view value)
+{
+ auto schema = value_schema_manager::instance().get_value_schema(meta_cf_data_version, value);
+ auto field = schema->extract_field(value, value_field_type::TIME_TAG).get();
+ return static_cast<time_tag_field *>(field)->time_tag;
+}
+
+/// Update expire_ts in rocksdb value with given version.
+/// The value schema must be in v0 or v1.
+inline void
+pegasus_update_expire_ts(uint32_t meta_cf_data_version, std::string &value, uint32_t new_expire_ts)
+{
+ auto schema = value_schema_manager::instance().get_value_schema(meta_cf_data_version, value);
+ auto expire_ts_field = dsn::make_unique<expire_timestamp_field>(new_expire_ts);
+ schema->update_field(value, std::move(expire_ts_field));
+}
+
+/// \return true if expired
+inline bool check_if_ts_expired(uint32_t epoch_now, uint32_t expire_ts)
+{
+ return expire_ts > 0 && expire_ts <= epoch_now;
+}
+
+/// \return true if expired
+inline bool check_if_record_expired(uint32_t value_schema_version,
+ uint32_t epoch_now,
+ dsn::string_view raw_value)
+{
+ return check_if_ts_expired(epoch_now,
+ pegasus_extract_expire_ts(value_schema_version, raw_value));
+}
+
} // namespace pegasus
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index 4f26844..1992e5a 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -27,6 +27,7 @@
#include "base/pegasus_utils.h"
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
+#include "base/value_schema_manager.h"
namespace pegasus {
namespace server {
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 890d7bb..cbdf3cd 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1470,7 +1470,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
_usage_scenario = _meta_store->get_usage_scenario();
uint64_t last_manual_compact_finish_time =
_meta_store->get_last_manual_compact_finish_time();
- if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
+ if (_pegasus_data_version > data_version::VERSION_MAX) {
derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
release_db();
return ::dsn::ERR_LOCAL_APP_FAILURE;
@@ -1480,7 +1480,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
} else {
// Write initial meta data to meta CF and flush when create new DB.
- _meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
+ _meta_store->set_data_version(data_version::VERSION_MAX);
_meta_store->set_last_flushed_decree(0);
_meta_store->set_last_manual_compact_finish_time(0);
flush_all_family_columns(true);
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 4a26577..d73567e 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -36,6 +36,7 @@
#include "pegasus_write_service.h"
#include "range_read_limiter.h"
#include "pegasus_read_service.h"
+#include "base/value_schema_manager.h"
namespace pegasus {
namespace server {
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 414e2c4..1800827 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -59,7 +59,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_data_cf(nullptr),
_meta_cf(nullptr),
_is_open(false),
- _pegasus_data_version(PEGASUS_DATA_VERSION_MAX),
+ _pegasus_data_version(data_version::VERSION_MAX),
_last_durable_decree(0),
_is_checkpointing(false),
_manual_compact_svc(this),
diff --git a/src/server/test/pegasus_value_schema_test.cpp b/src/server/test/pegasus_value_schema_test.cpp
index 77c8e46..08947e1 100644
--- a/src/server/test/pegasus_value_schema_test.cpp
+++ b/src/server/test/pegasus_value_schema_test.cpp
@@ -18,6 +18,7 @@
*/
#include "base/pegasus_value_schema.h"
+#include "base/value_schema_manager.h"
#include <gtest/gtest.h>
diff --git a/src/shell/commands/debugger.cpp b/src/shell/commands/debugger.cpp
index e3d8407..ed59532 100644
--- a/src/shell/commands/debugger.cpp
+++ b/src/shell/commands/debugger.cpp
@@ -19,6 +19,7 @@
#include "shell/commands.h"
#include "base/idl_utils.h"
+#include "base/value_schema_manager.h"
#include <rocksdb/sst_dump_tool.h>
#include <rocksdb/utilities/ldb_cmd.h>
#include <fmt/time.h>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org