You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/06/18 06:58:27 UTC

[incubator-pegasus] branch data-version-2 updated: feat: using value_schema_manager to reimplement value_generator (#757)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch data-version-2
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/data-version-2 by this push:
     new 5c7a1ee  feat: using value_schema_manager to reimplement value_generator (#757)
5c7a1ee is described below

commit 5c7a1ee6eab8b2b7dbe3b70905ece97bd7c7cd2f
Author: zhao liwei <zl...@163.com>
AuthorDate: Fri Jun 18 14:58:19 2021 +0800

    feat: using value_schema_manager to reimplement value_generator (#757)
---
 src/base/pegasus_value_schema.h               | 118 +-------------------------
 src/base/test/value_generator_test.cpp        |  45 ++++++++++
 src/base/value_generator.cpp                  |  38 +++++++++
 src/base/value_generator.h                    |  51 +++++++++++
 src/base/value_schema_manager.cpp             |   4 +
 src/base/value_schema_manager.h               |   2 +-
 src/base/value_schema_v0.cpp                  |   2 +
 src/server/key_ttl_compaction_filter.h        |   1 -
 src/server/pegasus_mutation_duplicator.cpp    |   1 +
 src/server/pegasus_server_impl.h              |   1 -
 src/server/pegasus_server_impl_init.cpp       |   1 +
 src/server/rocksdb_wrapper.cpp                |  10 ++-
 src/server/rocksdb_wrapper.h                  |   5 +-
 src/server/test/pegasus_value_schema_test.cpp |  67 ---------------
 src/server/test/rocksdb_wrapper_test.cpp      |   1 +
 15 files changed, 155 insertions(+), 192 deletions(-)

diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h
index 244eb27..3246b28 100644
--- a/src/base/pegasus_value_schema.h
+++ b/src/base/pegasus_value_schema.h
@@ -19,129 +19,15 @@
 
 #pragma once
 
-#include <stdint.h>
-#include <string.h>
 #include <string>
 #include <vector>
 
-#include <dsn/utility/ports.h>
-#include <dsn/utility/utils.h>
-#include <dsn/utility/smart_pointers.h>
-#include <dsn/utility/endians.h>
-#include <dsn/dist/fmt_logging.h>
-#include <dsn/service_api_c.h>
 #include <rocksdb/slice.h>
+#include <dsn/utility/string_view.h>
 
 #include "value_field.h"
 
 namespace pegasus {
-
-/// Helper class for generating value.
-/// NOTES:
-/// * the instance of pegasus_value_generator must be alive while the returned SliceParts is.
-/// * the data of user_data must be alive be alive while the returned SliceParts is, because
-///   we do not copy it.
-/// * the returned SliceParts is only valid before the next invoking of generate_value().
-class pegasus_value_generator
-{
-public:
-    /// A higher level utility for generating value with given version.
-    /// The value schema must be in v0 or v1.
-    rocksdb::SliceParts generate_value(uint32_t value_schema_version,
-                                       dsn::string_view user_data,
-                                       uint32_t expire_ts,
-                                       uint64_t timetag)
-    {
-        if (value_schema_version == 0) {
-            return generate_value_v0(expire_ts, user_data);
-        } else if (value_schema_version == 1) {
-            return generate_value_v1(expire_ts, timetag, user_data);
-        } else {
-            dfatal_f("unsupported value schema version: {}", value_schema_version);
-            __builtin_unreachable();
-        }
-    }
-
-    /// The heading expire_ts is encoded to support TTL, and the record will be
-    /// automatically cleared (by \see pegasus::server::KeyWithTTLCompactionFilter)
-    /// after expiration reached. The expired record will be invisible even though
-    /// they are not yet compacted.
-    ///
-    /// rocksdb value (ver 0) = [expire_ts(uint32_t)] [user_data(bytes)]
-    /// \internal
-    rocksdb::SliceParts generate_value_v0(uint32_t expire_ts, dsn::string_view user_data)
-    {
-        _write_buf.resize(sizeof(uint32_t));
-        _write_slices.clear();
-
-        dsn::data_output(_write_buf).write_u32(expire_ts);
-        _write_slices.emplace_back(_write_buf.data(), _write_buf.size());
-
-        if (user_data.length() > 0) {
-            _write_slices.emplace_back(user_data.data(), user_data.length());
-        }
-
-        return {&_write_slices[0], static_cast<int>(_write_slices.size())};
-    }
-
-    /// The value schema here is designed to resolve write conflicts during duplication,
-    /// specifically, when two clusters configured as "master-master" are concurrently
-    /// writing at the same key.
-    ///
-    /// Though writings on the same key from two different clusters are rare in
-    /// real-world cases, it still gives a bad user experience when it happens.
-    /// A simple solution is to separate the writes into two halves, each cluster
-    /// is responsible for one half only. How the writes are separated is left to
-    /// users. This is simple, but unfriendly to use.
-    ///
-    /// In our design, each value is provided with a timestamp [0, 2^56-1], which
-    /// represents the data version. A write duplicated from remote cluster firstly
-    /// compares its timestamp with the current one if exists. The one with
-    /// larger timestamp wins.
-    ///
-    /// An edge case occurs when the two timestamps are completely equal, the final
-    /// result is undefined. To solve this we make 7 bits of space for cluster_id
-    /// (the globally unique id of a cluster). In case when the timestamps are equal,
-    /// the conflicts can be resolved by comparing the cluster id.
-    ///
-    /// Consider another edge case in which a record is deleted from pegasus, however
-    /// in the remote cluster this record is written with a new value:
-    ///
-    ///   A: --------- update(ts:700)---- delete ---- update duplicated from B(ts:500) --
-    ///   B: ---- update(ts:500) --------------------------------------------------------
-    ///
-    /// Since the record is removed, the stale update will successfully though
-    /// incorrectly apply. To solve this problem there's 1 bit flag marking whether the
-    /// record is deleted.
-    ///
-    /// rocksdb value (ver 1)
-    ///  = [expire_ts(uint32_t)] [timetag(uint64_t)] [user_data(bytes)]
-    ///  = [expire_ts(unit32_t)]
-    ///    [timestamp in μs (56 bit)] [cluster_id (7 bit)] [deleted_tag (1 bit)]
-    ///    [user_data(bytes)]
-    ///
-    /// \internal
-    rocksdb::SliceParts
-    generate_value_v1(uint32_t expire_ts, uint64_t timetag, dsn::string_view user_data)
-    {
-        _write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t));
-        _write_slices.clear();
-
-        dsn::data_output(_write_buf).write_u32(expire_ts).write_u64(timetag);
-        _write_slices.emplace_back(_write_buf.data(), _write_buf.size());
-
-        if (user_data.length() > 0) {
-            _write_slices.emplace_back(user_data.data(), user_data.length());
-        }
-
-        return {&_write_slices[0], static_cast<int>(_write_slices.size())};
-    }
-
-private:
-    std::string _write_buf;
-    std::vector<rocksdb::Slice> _write_slices;
-};
-
 enum data_version
 {
     VERSION_0 = 0,
@@ -159,7 +45,7 @@ struct value_params
     }
 
     std::array<std::unique_ptr<value_field>, FIELD_COUNT> fields;
-    // write_buf and write_slices are transferred from `pegasus_value_generator`, which are used to
+    // write_buf and write_slices are transferred from `value_generator`, which are used to
     // prevent data copy
     std::string &write_buf;
     std::vector<rocksdb::Slice> &write_slices;
diff --git a/src/base/test/value_generator_test.cpp b/src/base/test/value_generator_test.cpp
new file mode 100644
index 0000000..6d33564
--- /dev/null
+++ b/src/base/test/value_generator_test.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_generator.h"
+#include "value_schema_manager.h"
+
+#include <gtest/gtest.h>
+
+using namespace pegasus;
+
+TEST(value_generator, generate_value)
+{
+    value_generator gen;
+    std::string user_data = "user_data";
+    uint32_t expire_ts = 10086;
+    uint64_t timetag = 1000;
+
+    auto sparts = gen.generate_value(user_data, expire_ts, timetag);
+    std::string value;
+    for (int i = 0; i < sparts.num_parts; i++) {
+        value += sparts.parts[i].ToString();
+    }
+
+    ASSERT_EQ(pegasus_extract_expire_ts(VERSION_MAX, value), expire_ts);
+    ASSERT_EQ(pegasus_extract_timetag(VERSION_MAX, value), timetag);
+    dsn::blob extract_user_data;
+    pegasus_extract_user_data(VERSION_MAX, std::move(value), extract_user_data);
+    ASSERT_EQ(extract_user_data.to_string(), user_data);
+}
diff --git a/src/base/value_generator.cpp b/src/base/value_generator.cpp
new file mode 100644
index 0000000..b603599
--- /dev/null
+++ b/src/base/value_generator.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "value_generator.h"
+#include "value_schema_manager.h"
+#include <dsn/utility/smart_pointers.h>
+
+namespace pegasus {
+/// A higher level utility for generating value with given version.
+rocksdb::SliceParts
+value_generator::generate_value(dsn::string_view user_data, uint32_t expire_ts, uint64_t timetag)
+{
+    value_params params(_write_buf, _write_slices);
+    params.fields[value_field_type::EXPIRE_TIMESTAMP] =
+        dsn::make_unique<expire_timestamp_field>(expire_ts);
+    params.fields[value_field_type::TIME_TAG] = dsn::make_unique<time_tag_field>(timetag);
+    params.fields[value_field_type::USER_DATA] = dsn::make_unique<user_data_field>(user_data);
+
+    auto schema = value_schema_manager::instance().get_latest_value_schema();
+    return schema->generate_value(params);
+}
+} // namespace pegasus
diff --git a/src/base/value_generator.h b/src/base/value_generator.h
new file mode 100644
index 0000000..52d0aca
--- /dev/null
+++ b/src/base/value_generator.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <rocksdb/slice.h>
+#include <dsn/utility/string_view.h>
+
+namespace pegasus {
+/**
+ * Helper class for generating value.
+ *
+ * NOTES:
+ * 1. The instance of value_generator must be alive while the returned SliceParts is.
+ * The data of user_data must be alive while the returned SliceParts is, because
+ * we do not copy it. And the returned SliceParts is only valid before the next invoking of
+ * generate_value().
+ * 2. Only the latest version of data is generated in this function, because we want to make
+ * data with older version disappear gradually.
+ */
+class value_generator
+{
+public:
+    value_generator() = default;
+    ~value_generator() = default;
+
+    /// A higher level utility for generating value with latest version.
+    rocksdb::SliceParts
+    generate_value(dsn::string_view user_data, uint32_t expire_ts, uint64_t timetag);
+
+private:
+    std::string _write_buf;
+    std::vector<rocksdb::Slice> _write_slices;
+};
+} // namespace pegasus
diff --git a/src/base/value_schema_manager.cpp b/src/base/value_schema_manager.cpp
index 23d53cd..ecaab95 100644
--- a/src/base/value_schema_manager.cpp
+++ b/src/base/value_schema_manager.cpp
@@ -22,6 +22,10 @@
 #include "value_schema_v1.h"
 #include "value_schema_v2.h"
 
+#include <dsn/utility/endians.h>
+#include <dsn/dist/fmt_logging.h>
+#include <dsn/c/api_utilities.h>
+
 namespace pegasus {
 value_schema_manager::value_schema_manager()
 {
diff --git a/src/base/value_schema_manager.h b/src/base/value_schema_manager.h
index 4582835..e60211c 100644
--- a/src/base/value_schema_manager.h
+++ b/src/base/value_schema_manager.h
@@ -21,6 +21,7 @@
 
 #include "pegasus_value_schema.h"
 #include <dsn/utility/singleton.h>
+#include <dsn/utility/smart_pointers.h>
 
 namespace pegasus {
 
@@ -42,7 +43,6 @@ private:
 };
 
 /// Generates timetag in host endian.
-/// \see comment on pegasus_value_generator::generate_value_v1
 inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag)
 {
     return timestamp << 8u | cluster_id << 1u | deleted_tag;
diff --git a/src/base/value_schema_v0.cpp b/src/base/value_schema_v0.cpp
index 850c07c..36911c5 100644
--- a/src/base/value_schema_v0.cpp
+++ b/src/base/value_schema_v0.cpp
@@ -19,6 +19,8 @@
 
 #include "value_schema_v0.h"
 
+#include <dsn/utility/endians.h>
+#include <dsn/c/api_utilities.h>
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/utility/smart_pointers.h>
 
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index 1992e5a..c1bdb8c 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -90,7 +90,6 @@ private:
     uint32_t _pegasus_data_version;
     uint32_t _default_ttl;
     bool _enabled; // only process filtering when _enabled == true
-    mutable pegasus_value_generator _gen;
     int32_t _partition_index;
     int32_t _partition_version;
     bool _validate_partition_hash;
diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp
index 7244f4c..0e28fa5 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -24,6 +24,7 @@
 #include <dsn/cpp/message_utils.h>
 #include <dsn/utility/chrono_literals.h>
 #include <dsn/dist/replication/duplication_common.h>
+#include <dsn/dist/fmt_logging.h>
 #include <rrdb/rrdb.client.h>
 
 namespace dsn {
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index d73567e..4a26577 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -36,7 +36,6 @@
 #include "pegasus_write_service.h"
 #include "range_read_limiter.h"
 #include "pegasus_read_service.h"
-#include "base/value_schema_manager.h"
 
 namespace pegasus {
 namespace server {
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 1800827..24db514 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -22,6 +22,7 @@
 #include <unordered_map>
 #include <dsn/utility/flags.h>
 #include <rocksdb/filter_policy.h>
+#include <dsn/dist/fmt_logging.h>
 
 #include "capacity_unit_calculator.h"
 #include "hashkey_transform.h"
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 2974693..a46832d 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -22,7 +22,7 @@
 #include <dsn/utility/fail_point.h>
 #include <rocksdb/db.h>
 #include "pegasus_write_service_impl.h"
-#include "base/pegasus_value_schema.h"
+#include "base/value_generator.h"
 
 namespace pegasus {
 namespace server {
@@ -37,13 +37,15 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
       _default_ttl(0)
 {
     _write_batch = dsn::make_unique<rocksdb::WriteBatch>();
-    _value_generator = dsn::make_unique<pegasus_value_generator>();
+    _value_generator = dsn::make_unique<value_generator>();
 
     _wt_opts = dsn::make_unique<rocksdb::WriteOptions>();
     // disable write ahead logging as replication handles logging instead now
     _wt_opts->disableWAL = true;
 }
 
+rocksdb_wrapper::~rocksdb_wrapper() = default;
+
 int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
 {
     FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; });
@@ -119,8 +121,8 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
 
     rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
     rocksdb::SliceParts skey_parts(&skey, 1);
-    rocksdb::SliceParts svalue = _value_generator->generate_value(
-        _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
+    rocksdb::SliceParts svalue =
+        _value_generator->generate_value(value, db_expire_ts(expire_sec), new_timetag);
     rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
     if (dsn_unlikely(!s.ok())) {
         ::dsn::blob hash_key, sort_key;
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index b29bb7d..43f73d6 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -35,7 +35,7 @@ class perf_counter_wrapper;
 } // namespace dsn
 
 namespace pegasus {
-class pegasus_value_generator;
+class value_generator;
 
 namespace server {
 struct db_get_context;
@@ -46,6 +46,7 @@ class rocksdb_wrapper : public dsn::replication::replica_base
 {
 public:
     rocksdb_wrapper(pegasus_server_impl *server);
+    ~rocksdb_wrapper();
 
     /// Calls RocksDB Get and store the result into `db_get_context`.
     /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
@@ -73,7 +74,7 @@ private:
 
     rocksdb::DB *_db;
     rocksdb::ReadOptions &_rd_opts;
-    std::unique_ptr<pegasus_value_generator> _value_generator;
+    std::unique_ptr<value_generator> _value_generator;
     std::unique_ptr<rocksdb::WriteBatch> _write_batch;
     std::unique_ptr<rocksdb::WriteOptions> _wt_opts;
     rocksdb::ColumnFamilyHandle *_meta_cf;
diff --git a/src/server/test/pegasus_value_schema_test.cpp b/src/server/test/pegasus_value_schema_test.cpp
deleted file mode 100644
index 08947e1..0000000
--- a/src/server/test/pegasus_value_schema_test.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "base/pegasus_value_schema.h"
-#include "base/value_schema_manager.h"
-
-#include <gtest/gtest.h>
-
-using namespace pegasus;
-
-TEST(value_schema, generate_and_extract_v1_v0)
-{
-    struct test_case
-    {
-        int value_schema_version;
-
-        uint32_t expire_ts;
-        uint64_t timetag;
-        std::string user_data;
-    } tests[] = {
-        {1, 1000, 10001, ""},
-        {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), "pegasus"},
-        {1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), ""},
-
-        {0, 1000, 0, ""},
-        {0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
-        {0, std::numeric_limits<uint32_t>::max(), 0, ""},
-        {0, 0, 0, "a"},
-    };
-
-    for (auto &t : tests) {
-        pegasus_value_generator gen;
-        rocksdb::SliceParts sparts =
-            gen.generate_value(t.value_schema_version, t.user_data, t.expire_ts, t.timetag);
-
-        std::string raw_value;
-        for (int i = 0; i < sparts.num_parts; i++) {
-            raw_value += sparts.parts[i].ToString();
-        }
-
-        ASSERT_EQ(t.expire_ts, pegasus_extract_expire_ts(t.value_schema_version, raw_value));
-
-        if (t.value_schema_version == 1) {
-            ASSERT_EQ(t.timetag, pegasus_extract_timetag(t.value_schema_version, raw_value));
-        }
-
-        dsn::blob user_data;
-        pegasus_extract_user_data(t.value_schema_version, std::move(raw_value), user_data);
-        ASSERT_EQ(t.user_data, user_data.to_string());
-    }
-}
diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp
index 313f522..99f2972 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -20,6 +20,7 @@
 #include "server/pegasus_server_write.h"
 #include "server/pegasus_write_service_impl.h"
 #include "pegasus_server_test_base.h"
+#include "base/value_generator.h"
 
 namespace pegasus {
 namespace server {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org