You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/06/18 06:58:27 UTC
[incubator-pegasus] branch data-version-2 updated: feat: using
value_schema_manager to reimplement value_generator (#757)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch data-version-2
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/data-version-2 by this push:
new 5c7a1ee feat: using value_schema_manager to reimplement value_generator (#757)
5c7a1ee is described below
commit 5c7a1ee6eab8b2b7dbe3b70905ece97bd7c7cd2f
Author: zhao liwei <zl...@163.com>
AuthorDate: Fri Jun 18 14:58:19 2021 +0800
feat: using value_schema_manager to reimplement value_generator (#757)
---
src/base/pegasus_value_schema.h | 118 +-------------------------
src/base/test/value_generator_test.cpp | 45 ++++++++++
src/base/value_generator.cpp | 38 +++++++++
src/base/value_generator.h | 51 +++++++++++
src/base/value_schema_manager.cpp | 4 +
src/base/value_schema_manager.h | 2 +-
src/base/value_schema_v0.cpp | 2 +
src/server/key_ttl_compaction_filter.h | 1 -
src/server/pegasus_mutation_duplicator.cpp | 1 +
src/server/pegasus_server_impl.h | 1 -
src/server/pegasus_server_impl_init.cpp | 1 +
src/server/rocksdb_wrapper.cpp | 10 ++-
src/server/rocksdb_wrapper.h | 5 +-
src/server/test/pegasus_value_schema_test.cpp | 67 ---------------
src/server/test/rocksdb_wrapper_test.cpp | 1 +
15 files changed, 155 insertions(+), 192 deletions(-)
diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 244eb27..3246b28 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -19,129 +19,15 @@
#pragma once
-#include <stdint.h>
-#include <string.h>
#include <string>
#include <vector>
-#include <dsn/utility/ports.h>
-#include <dsn/utility/utils.h>
-#include <dsn/utility/smart_pointers.h>
-#include <dsn/utility/endians.h>
-#include <dsn/dist/fmt_logging.h>
-#include <dsn/service_api_c.h>
#include <rocksdb/slice.h>
+#include <dsn/utility/string_view.h>
#include "value_field.h"
namespace pegasus {
-
-/// Helper class for generating value.
-/// NOTES:
-/// * the instance of pegasus_value_generator must be alive while the returned SliceParts is.
-/// * the data of user_data must be alive be alive while the returned SliceParts is, because
-/// we do not copy it.
-/// * the returned SliceParts is only valid before the next invoking of generate_value().
-class pegasus_value_generator
-{
-public:
- /// A higher level utility for generating value with given version.
- /// The value schema must be in v0 or v1.
- rocksdb::SliceParts generate_value(uint32_t value_schema_version,
- dsn::string_view user_data,
- uint32_t expire_ts,
- uint64_t timetag)
- {
- if (value_schema_version == 0) {
- return generate_value_v0(expire_ts, user_data);
- } else if (value_schema_version == 1) {
- return generate_value_v1(expire_ts, timetag, user_data);
- } else {
- dfatal_f("unsupported value schema version: {}", value_schema_version);
- __builtin_unreachable();
- }
- }
-
- /// The heading expire_ts is encoded to support TTL, and the record will be
- /// automatically cleared (by \see pegasus::server::KeyWithTTLCompactionFilter)
- /// after expiration reached. The expired record will be invisible even though
- /// they are not yet compacted.
- ///
- /// rocksdb value (ver 0) = [expire_ts(uint32_t)] [user_data(bytes)]
- /// \internal
- rocksdb::SliceParts generate_value_v0(uint32_t expire_ts, dsn::string_view user_data)
- {
- _write_buf.resize(sizeof(uint32_t));
- _write_slices.clear();
-
- dsn::data_output(_write_buf).write_u32(expire_ts);
- _write_slices.emplace_back(_write_buf.data(), _write_buf.size());
-
- if (user_data.length() > 0) {
- _write_slices.emplace_back(user_data.data(), user_data.length());
- }
-
- return {&_write_slices[0], static_cast<int>(_write_slices.size())};
- }
-
- /// The value schema here is designed to resolve write conflicts during duplication,
- /// specifically, when two clusters configured as "master-master" are concurrently
- /// writing at the same key.
- ///
- /// Though writings on the same key from two different clusters are rare in
- /// real-world cases, it still gives a bad user experience when it happens.
- /// A simple solution is to separate the writes into two halves, each cluster
- /// is responsible for one half only. How the writes are separated is left to
- /// users. This is simple, but unfriendly to use.
- ///
- /// In our design, each value is provided with a timestamp [0, 2^56-1], which
- /// represents the data version. A write duplicated from remote cluster firstly
- /// compares its timestamp with the current one if exists. The one with
- /// larger timestamp wins.
- ///
- /// An edge case occurs when the two timestamps are completely equal, the final
- /// result is undefined. To solve this we make 7 bits of space for cluster_id
- /// (the globally unique id of a cluster). In case when the timestamps are equal,
- /// the conflicts can be resolved by comparing the cluster id.
- ///
- /// Consider another edge case in which a record is deleted from pegasus, however
- /// in the remote cluster this record is written with a new value:
- ///
- /// A: --------- update(ts:700)---- delete ---- update duplicated from B(ts:500) --
- /// B: ---- update(ts:500) --------------------------------------------------------
- ///
- /// Since the record is removed, the stale update will successfully though
- /// incorrectly apply. To solve this problem there's 1 bit flag marking whether the
- /// record is deleted.
- ///
- /// rocksdb value (ver 1)
- /// = [expire_ts(uint32_t)] [timetag(uint64_t)] [user_data(bytes)]
- /// = [expire_ts(unit32_t)]
- /// [timestamp in μs (56 bit)] [cluster_id (7 bit)] [deleted_tag (1 bit)]
- /// [user_data(bytes)]
- ///
- /// \internal
- rocksdb::SliceParts
- generate_value_v1(uint32_t expire_ts, uint64_t timetag, dsn::string_view user_data)
- {
- _write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t));
- _write_slices.clear();
-
- dsn::data_output(_write_buf).write_u32(expire_ts).write_u64(timetag);
- _write_slices.emplace_back(_write_buf.data(), _write_buf.size());
-
- if (user_data.length() > 0) {
- _write_slices.emplace_back(user_data.data(), user_data.length());
- }
-
- return {&_write_slices[0], static_cast<int>(_write_slices.size())};
- }
-
-private:
- std::string _write_buf;
- std::vector<rocksdb::Slice> _write_slices;
-};
-
enum data_version
{
VERSION_0 = 0,
@@ -159,7 +45,7 @@ struct value_params
}
std::array<std::unique_ptr<value_field>, FIELD_COUNT> fields;
- // write_buf and write_slices are transferred from `pegasus_value_generator`, which are used to
+ // write_buf and write_slices are transferred from `value_generator`, which are used to
// prevent data copy
std::string &write_buf;
std::vector<rocksdb::Slice> &write_slices;
diff --git a/src/base/test/value_generator_test.cpp b/src/base/test/value_generator_test.cpp
new file mode 100644
index 0000000..6d33564
--- /dev/null
+++ b/src/base/test/value_generator_test.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_generator.h"
+#include "value_schema_manager.h"
+
+#include <gtest/gtest.h>
+
+using namespace pegasus;
+
+TEST(value_generator, generate_value)
+{
+ value_generator gen;
+ std::string user_data = "user_data";
+ uint32_t expire_ts = 10086;
+ uint64_t timetag = 1000;
+
+ auto sparts = gen.generate_value(user_data, expire_ts, timetag);
+ std::string value;
+ for (int i = 0; i < sparts.num_parts; i++) {
+ value += sparts.parts[i].ToString();
+ }
+
+ ASSERT_EQ(pegasus_extract_expire_ts(VERSION_MAX, value), expire_ts);
+ ASSERT_EQ(pegasus_extract_timetag(VERSION_MAX, value), timetag);
+ dsn::blob extract_user_data;
+ pegasus_extract_user_data(VERSION_MAX, std::move(value), extract_user_data);
+ ASSERT_EQ(extract_user_data.to_string(), user_data);
+}
diff --git a/src/base/value_generator.cpp b/src/base/value_generator.cpp
new file mode 100644
index 0000000..b603599
--- /dev/null
+++ b/src/base/value_generator.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_generator.h"
+#include "value_schema_manager.h"
+#include <dsn/utility/smart_pointers.h>
+
+namespace pegasus {
+/// A higher level utility for generating value with given version.
+rocksdb::SliceParts
+value_generator::generate_value(dsn::string_view user_data, uint32_t expire_ts, uint64_t timetag)
+{
+ value_params params(_write_buf, _write_slices);
+ params.fields[value_field_type::EXPIRE_TIMESTAMP] =
+ dsn::make_unique<expire_timestamp_field>(expire_ts);
+ params.fields[value_field_type::TIME_TAG] = dsn::make_unique<time_tag_field>(timetag);
+ params.fields[value_field_type::USER_DATA] = dsn::make_unique<user_data_field>(user_data);
+
+ auto schema = value_schema_manager::instance().get_latest_value_schema();
+ return schema->generate_value(params);
+}
+} // namespace pegasus
diff --git a/src/base/value_generator.h b/src/base/value_generator.h
new file mode 100644
index 0000000..52d0aca
--- /dev/null
+++ b/src/base/value_generator.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <rocksdb/slice.h>
+#include <dsn/utility/string_view.h>
+
+namespace pegasus {
+/**
+ * Helper class for generating value.
+ *
+ * NOTES:
+ * 1. The instance of value_generator must be alive while the returned SliceParts is.
+ * The data of user_data must be alive while the returned SliceParts is, because
+ * we do not copy it. And the returned SliceParts is only valid before the next invoking of
+ * generate_value().
+ * 2. Only the latest version of data is generated in this function, because we want to make
+ * data with older version disappear gradually.
+ */
+class value_generator
+{
+public:
+ value_generator() = default;
+ ~value_generator() = default;
+
+ /// A higher level utility for generating value with latest version.
+ rocksdb::SliceParts
+ generate_value(dsn::string_view user_data, uint32_t expire_ts, uint64_t timetag);
+
+private:
+ std::string _write_buf;
+ std::vector<rocksdb::Slice> _write_slices;
+};
+} // namespace pegasus
diff --git a/src/base/value_schema_manager.cpp b/src/base/value_schema_manager.cpp
index 23d53cd..ecaab95 100644
--- a/src/base/value_schema_manager.cpp
+++ b/src/base/value_schema_manager.cpp
@@ -22,6 +22,10 @@
#include "value_schema_v1.h"
#include "value_schema_v2.h"
+#include <dsn/utility/endians.h>
+#include <dsn/dist/fmt_logging.h>
+#include <dsn/c/api_utilities.h>
+
namespace pegasus {
value_schema_manager::value_schema_manager()
{
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_manager.h
index 4582835..e60211c 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_manager.h
@@ -21,6 +21,7 @@
#include "pegasus_value_schema.h"
#include <dsn/utility/singleton.h>
+#include <dsn/utility/smart_pointers.h>
namespace pegasus {
@@ -42,7 +43,6 @@ private:
};
/// Generates timetag in host endian.
-/// \see comment on pegasus_value_generator::generate_value_v1
inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag)
{
return timestamp << 8u | cluster_id << 1u | deleted_tag;
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v0.cpp
index 850c07c..36911c5 100644
--- a/src/base/value_schema_v0.cpp
+++ b/src/base/value_schema_v0.cpp
@@ -19,6 +19,8 @@
#include "value_schema_v0.h"
+#include <dsn/utility/endians.h>
+#include <dsn/c/api_utilities.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/smart_pointers.h>
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index 1992e5a..c1bdb8c 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -90,7 +90,6 @@ private:
uint32_t _pegasus_data_version;
uint32_t _default_ttl;
bool _enabled; // only process filtering when _enabled == true
- mutable pegasus_value_generator _gen;
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp
index 7244f4c..0e28fa5 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -24,6 +24,7 @@
#include <dsn/cpp/message_utils.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/duplication_common.h>
+#include <dsn/dist/fmt_logging.h>
#include <rrdb/rrdb.client.h>
namespace dsn {
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index d73567e..4a26577 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -36,7 +36,6 @@
#include "pegasus_write_service.h"
#include "range_read_limiter.h"
#include "pegasus_read_service.h"
-#include "base/value_schema_manager.h"
namespace pegasus {
namespace server {
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 1800827..24db514 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -22,6 +22,7 @@
#include <unordered_map>
#include <dsn/utility/flags.h>
#include <rocksdb/filter_policy.h>
+#include <dsn/dist/fmt_logging.h>
#include "capacity_unit_calculator.h"
#include "hashkey_transform.h"
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 2974693..a46832d 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -22,7 +22,7 @@
#include <dsn/utility/fail_point.h>
#include <rocksdb/db.h>
#include "pegasus_write_service_impl.h"
-#include "base/pegasus_value_schema.h"
+#include "base/value_generator.h"
namespace pegasus {
namespace server {
@@ -37,13 +37,15 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
_default_ttl(0)
{
_write_batch = dsn::make_unique<rocksdb::WriteBatch>();
- _value_generator = dsn::make_unique<pegasus_value_generator>();
+ _value_generator = dsn::make_unique<value_generator>();
_wt_opts = dsn::make_unique<rocksdb::WriteOptions>();
// disable write ahead logging as replication handles logging instead now
_wt_opts->disableWAL = true;
}
+rocksdb_wrapper::~rocksdb_wrapper() = default;
+
int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; });
@@ -119,8 +121,8 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
- rocksdb::SliceParts svalue = _value_generator->generate_value(
- _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
+ rocksdb::SliceParts svalue =
+ _value_generator->generate_value(value, db_expire_ts(expire_sec), new_timetag);
rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index b29bb7d..43f73d6 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -35,7 +35,7 @@ class perf_counter_wrapper;
} // namespace dsn
namespace pegasus {
-class pegasus_value_generator;
+class value_generator;
namespace server {
struct db_get_context;
@@ -46,6 +46,7 @@ class rocksdb_wrapper : public dsn::replication::replica_base
{
public:
rocksdb_wrapper(pegasus_server_impl *server);
+ ~rocksdb_wrapper();
/// Calls RocksDB Get and store the result into `db_get_context`.
/// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
@@ -73,7 +74,7 @@ private:
rocksdb::DB *_db;
rocksdb::ReadOptions &_rd_opts;
- std::unique_ptr<pegasus_value_generator> _value_generator;
+ std::unique_ptr<value_generator> _value_generator;
std::unique_ptr<rocksdb::WriteBatch> _write_batch;
std::unique_ptr<rocksdb::WriteOptions> _wt_opts;
rocksdb::ColumnFamilyHandle *_meta_cf;
diff --git a/src/server/test/pegasus_value_schema_test.cpp b/src/server/test/pegasus_value_schema_test.cpp
deleted file mode 100644
index 08947e1..0000000
--- a/src/server/test/pegasus_value_schema_test.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "base/pegasus_value_schema.h"
-#include "base/value_schema_manager.h"
-
-#include <gtest/gtest.h>
-
-using namespace pegasus;
-
-TEST(value_schema, generate_and_extract_v1_v0)
-{
- struct test_case
- {
- int value_schema_version;
-
- uint32_t expire_ts;
- uint64_t timetag;
- std::string user_data;
- } tests[] = {
- {1, 1000, 10001, ""},
- {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), "pegasus"},
- {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), ""},
-
- {0, 1000, 0, ""},
- {0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
- {0, std::numeric_limits<uint32_t>::max(), 0, ""},
- {0, 0, 0, "a"},
- };
-
- for (auto &t : tests) {
- pegasus_value_generator gen;
- rocksdb::SliceParts sparts =
- gen.generate_value(t.value_schema_version, t.user_data, t.expire_ts, t.timetag);
-
- std::string raw_value;
- for (int i = 0; i < sparts.num_parts; i++) {
- raw_value += sparts.parts[i].ToString();
- }
-
- ASSERT_EQ(t.expire_ts, pegasus_extract_expire_ts(t.value_schema_version, raw_value));
-
- if (t.value_schema_version == 1) {
- ASSERT_EQ(t.timetag, pegasus_extract_timetag(t.value_schema_version, raw_value));
- }
-
- dsn::blob user_data;
- pegasus_extract_user_data(t.value_schema_version, std::move(raw_value), user_data);
- ASSERT_EQ(t.user_data, user_data.to_string());
- }
-}
diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp
index 313f522..99f2972 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -20,6 +20,7 @@
#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service_impl.h"
#include "pegasus_server_test_base.h"
+#include "base/value_generator.h"
namespace pegasus {
namespace server {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org