You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/11/10 03:23:50 UTC
[incubator-datasketches-cpp] branch req_sketch updated: serialized
layout change
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch req_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git
The following commit(s) were added to refs/heads/req_sketch by this push:
new 78ebd88 serialized layout change
78ebd88 is described below
commit 78ebd88135c7fc97bb1b394173d260f4e1d23712
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Nov 9 18:58:41 2020 -0800
serialized layout change
---
req/include/req_common.hpp | 2 +-
req/include/req_compactor.hpp | 2 +-
req/include/req_compactor_impl.hpp | 8 ++--
req/include/req_sketch.hpp | 6 +--
req/include/req_sketch_impl.hpp | 97 ++++++++++++++++++++++----------------
5 files changed, 67 insertions(+), 48 deletions(-)
diff --git a/req/include/req_common.hpp b/req/include/req_common.hpp
index 1797a36..78ce505 100755
--- a/req/include/req_common.hpp
+++ b/req/include/req_common.hpp
@@ -29,7 +29,7 @@ namespace datasketches {
static std::independent_bits_engine<std::mt19937, 1, unsigned> req_random_bit(std::chrono::system_clock::now().time_since_epoch().count());
namespace req_constants {
- static const uint32_t MIN_K = 4;
+ static const uint16_t MIN_K = 4;
static const uint32_t INIT_NUM_SECTIONS = 3;
}
diff --git a/req/include/req_compactor.hpp b/req/include/req_compactor.hpp
index 6f97813..f939a19 100755
--- a/req/include/req_compactor.hpp
+++ b/req/include/req_compactor.hpp
@@ -30,7 +30,7 @@ typename Allocator
>
class req_compactor {
public:
- req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator);
+ req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, const T* item_ptr = nullptr, bool sorted = true);
bool is_sorted() const;
uint32_t get_num_items() const;
diff --git a/req/include/req_compactor_impl.hpp b/req/include/req_compactor_impl.hpp
index d803832..dc1f87e 100755
--- a/req/include/req_compactor_impl.hpp
+++ b/req/include/req_compactor_impl.hpp
@@ -29,17 +29,19 @@
namespace datasketches {
template<typename T, bool H, typename C, typename A>
-req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator):
+req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator, const T* item_ptr, bool sorted):
lg_weight_(lg_weight),
coin_(false),
-sorted_(true),
+sorted_(sorted),
section_size_raw_(section_size),
section_size_(section_size),
num_sections_(req_constants::INIT_NUM_SECTIONS),
num_compactions_(0),
state_(0),
items_(allocator)
-{}
+{
+ if (item_ptr != nullptr) items_.push_back(*item_ptr);
+}
template<typename T, bool H, typename C, typename A>
bool req_compactor<T, H, C, A>::is_sorted() const {
diff --git a/req/include/req_sketch.hpp b/req/include/req_sketch.hpp
index 6c8912c..117182a 100755
--- a/req/include/req_sketch.hpp
+++ b/req/include/req_sketch.hpp
@@ -38,7 +38,7 @@ public:
using Compactor = req_compactor<T, IsHighRank, Comparator, Allocator>;
using AllocCompactor = typename std::allocator_traits<Allocator>::template rebind_alloc<Compactor>;
- explicit req_sketch(uint32_t k, const Allocator& allocator = Allocator());
+ explicit req_sketch(uint16_t k, const Allocator& allocator = Allocator());
~req_sketch();
// TODO: copy, move, assign
@@ -132,7 +132,7 @@ public:
private:
Allocator allocator_;
- uint32_t k_;
+ uint16_t k_;
uint32_t max_nom_size_;
uint32_t num_retained_;
uint64_t n_;
@@ -142,7 +142,7 @@ private:
static const uint8_t SERIAL_VERSION = 1;
static const uint8_t FAMILY = 17;
- enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM, IS_HIGH_RANK, IS_ESTIMATION_MODE };
+ enum flags { RESERVED1, RESERVED2, IS_EMPTY, IS_HIGH_RANK, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
uint8_t get_num_levels() const;
void grow();
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
index 09e19f2..bf2ce7b 100755
--- a/req/include/req_sketch_impl.hpp
+++ b/req/include/req_sketch_impl.hpp
@@ -25,9 +25,9 @@
namespace datasketches {
template<typename T, bool H, typename C, typename S, typename A>
-req_sketch<T, H, C, S, A>::req_sketch(uint32_t k, const A& allocator):
+req_sketch<T, H, C, S, A>::req_sketch(uint16_t k, const A& allocator):
allocator_(allocator),
-k_(std::max(k & -2, req_constants::MIN_K)), //rounds down one if odd
+k_(std::max(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
max_nom_size_(0),
num_retained_(0),
n_(0),
@@ -136,42 +136,46 @@ const T& req_sketch<T, H, C, S, A>::get_quantile(double rank) const {
template<typename T, bool H, typename C, typename S, typename A>
void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
const uint8_t preamble_longs = 1;
- os.write(reinterpret_cast<const char*>(&preamble_longs), sizeof(preamble_longs));
+ write(os, preamble_longs);
const uint8_t serial_version = SERIAL_VERSION;
- os.write(reinterpret_cast<const char*>(&serial_version), sizeof(serial_version));
+ write(os, serial_version);
const uint8_t family = FAMILY;
- os.write(reinterpret_cast<const char*>(&family), sizeof(family));
+ write(os, family);
const bool is_single_item = n_ == 1;
const uint8_t flags_byte(
(is_empty() ? 1 << flags::IS_EMPTY : 0)
+ | (H ? 1 << flags::IS_HIGH_RANK : 0)
| (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
| (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
- | (H ? 1 << flags::IS_HIGH_RANK : 0)
- | (is_estimation_mode() ? 1 << flags::IS_ESTIMATION_MODE : 0)
);
- os.write(reinterpret_cast<const char*>(&flags_byte), sizeof(flags_byte));
- os.write(reinterpret_cast<const char*>(&k_), sizeof(k_));
+ write(os, flags_byte);
+ write(os, k_);
+ const uint8_t num_levels = get_num_levels();
+ write(os, num_levels);
+ const uint8_t unused = 0;
+ write(os, unused);
if (is_empty()) return;
- if (!is_single_item) {
- if (is_estimation_mode()) {
- os.write(reinterpret_cast<const char*>(&n_), sizeof(n_));
- const uint8_t num_levels = compactors_.size();
- os.write(reinterpret_cast<const char*>(&num_levels), sizeof(num_levels));
- }
- // do we want some padding here?
+ if (is_estimation_mode()) {
+ write(os, n_);
S().serialize(os, min_value_, 1);
S().serialize(os, max_value_, 1);
}
- for (const auto& compactor: compactors_) compactor.serialize(os, S());
+ if (is_single_item) {
+ S().serialize(os, min_value_, 1);
+ } else {
+ for (const auto& compactor: compactors_) compactor.serialize(os, S());
+ }
}
template<typename T, bool H, typename C, typename S, typename A>
req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& is, const A& allocator) {
- auto preamble_longs = read<uint8_t>(is);
- auto serial_version = read<uint8_t>(is);
- auto family_id = read<uint8_t>(is);
- auto flags_byte = read<uint8_t>(is);
- auto k = read<uint32_t>(is);
+ const auto preamble_longs = read<uint8_t>(is);
+ const auto serial_version = read<uint8_t>(is);
+ const auto family_id = read<uint8_t>(is);
+ const auto flags_byte = read<uint8_t>(is);
+ const auto k = read<uint16_t>(is);
+ const auto num_levels = read<uint8_t>(is);
+ const auto unused = read<uint8_t>(is);
// TODO: checks
@@ -180,15 +184,7 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
if (is_empty) return req_sketch(k, allocator);
uint64_t n = 1;
- uint8_t num_levels = 1;
- const bool is_single_item = flags_byte & (1 << flags::IS_SINGLE_ITEM);
- const bool is_estimation_mode = flags_byte & (1 << flags::IS_ESTIMATION_MODE);
- if (!is_single_item) {
- if (is_estimation_mode) {
- n = read<uint64_t>(is);
- num_levels = read<uint8_t>(is);
- }
- }
+ if (num_levels > 1) n = read<uint64_t>(is);
A alloc(allocator);
auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
@@ -197,7 +193,8 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
- if (!is_single_item) {
+ const bool is_single_item = flags_byte & (1 << flags::IS_SINGLE_ITEM);
+ if (num_levels > 1) {
S().deserialize(is, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
@@ -209,18 +206,38 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
std::vector<Compactor, AllocCompactor> compactors(allocator);
std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
- for (size_t i = 0; i < num_levels; ++i) {
- auto compactor = req_compactor<T, H, C, A>::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true);
- compactors.push_back(std::move(compactor));
- }
- if (!is_estimation_mode) n = compactors[0].get_num_items();
if (is_single_item) {
- new (min_value_buffer.get()) T(compactors[0].get_items()[0]);
- // copy did not throw, repackage with destrtuctor
+ S().deserialize(is, min_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
- new (max_value_buffer.get()) T(compactors[0].get_items()[0]);
+ new (max_value_buffer.get()) T(*min_value);
// copy did not throw, repackage with destrtuctor
max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ compactors.push_back(req_compactor<T, H, C, A>(1, k, allocator, min_value.get(), is_level_0_sorted));
+ } else {
+ if (num_levels == 1) {
+ const auto& items = compactors[0].get_items();
+ n = items.size();
+ auto min_it = items.begin();
+ auto max_it = items.begin();
+ auto it = items.begin();
+ while (it != items.end()) {
+ if (C()(*it, *min_it)) min_it = it;
+ if (C()(*max_it, *it)) max_it = it;
+ ++it;
+ }
+ new (min_value_buffer.get()) T(*min_it);
+ // copy did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+ new (max_value_buffer.get()) T(*max_it);
+ // copy did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ } else {
+ for (size_t i = 0; i < num_levels; ++i) {
+ auto compactor = req_compactor<T, H, C, A>::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true);
+ compactors.push_back(std::move(compactor));
+ }
+ }
}
if (!is.good()) throw std::runtime_error("error reading from std::istream");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org