You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/11/18 05:18:14 UTC
[incubator-datasketches-cpp] branch req_sketch updated: serialize
raw items
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch req_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git
The following commit(s) were added to refs/heads/req_sketch by this push:
new b096a63 serialize raw items
b096a63 is described below
commit b096a63512c2cf70c4ae1261422a2c1ddd6088ab
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Tue Nov 17 21:18:02 2020 -0800
serialize raw items
---
req/include/req_compactor.hpp | 14 ++++-
req/include/req_compactor_impl.hpp | 54 +++++++++++++++----
req/include/req_sketch_impl.hpp | 108 +++++++++++++++++--------------------
req/test/req_sketch_test.cpp | 34 ++++++++++--
4 files changed, 138 insertions(+), 72 deletions(-)
diff --git a/req/include/req_compactor.hpp b/req/include/req_compactor.hpp
index 977fd46..2c429fa 100755
--- a/req/include/req_compactor.hpp
+++ b/req/include/req_compactor.hpp
@@ -30,7 +30,7 @@ typename Allocator
>
class req_compactor {
public:
- req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, const T* item_ptr = nullptr, bool sorted = true);
+ req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, bool sorted = true);
bool is_sorted() const;
uint32_t get_num_items() const;
@@ -80,6 +80,18 @@ public:
template<typename S>
static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted);
+ template<typename S>
+ static req_compactor deserialize(std::istream& is, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items);
+
+ template<typename S>
+ static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items);
+
+ template<typename S>
+ static std::vector<T, Allocator> deserialize_items(std::istream& is, const S& serde, const Allocator& allocator, size_t num);
+
+ template<typename S>
+ static std::pair<std::vector<T, Allocator>, size_t> deserialize_items(const void* bytes, size_t size, const S& serde, const Allocator& allocator, size_t num);
+
private:
uint8_t lg_weight_;
bool coin_; // random bit for compaction
diff --git a/req/include/req_compactor_impl.hpp b/req/include/req_compactor_impl.hpp
index dce0a13..cc44ea6 100755
--- a/req/include/req_compactor_impl.hpp
+++ b/req/include/req_compactor_impl.hpp
@@ -30,7 +30,7 @@
namespace datasketches {
template<typename T, bool H, typename C, typename A>
-req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator, const T* item_ptr, bool sorted):
+req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator, bool sorted):
lg_weight_(lg_weight),
coin_(false),
sorted_(sorted),
@@ -39,9 +39,7 @@ section_size_(section_size),
num_sections_(req_constants::INIT_NUM_SECTIONS),
state_(0),
items_(allocator)
-{
- if (item_ptr != nullptr) items_.push_back(*item_ptr);
-}
+{}
template<typename T, bool H, typename C, typename A>
bool req_compactor<T, H, C, A>::is_sorted() const {
@@ -256,17 +254,32 @@ req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& i
auto num_sections = read<decltype(num_sections_)>(is);
read<uint16_t>(is); // padding
auto num_items = read<uint32_t>(is);
+ auto items = deserialize_items(is, serde, allocator, num_items);
+ return req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items));
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items) {
+ auto items = deserialize_items(is, serde, allocator, num_items);
+ return req_compactor(0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(items));
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+std::vector<T, A> req_compactor<T, H, C, A>::deserialize_items(std::istream& is, const S& serde, const A& allocator, size_t num) {
std::vector<T, A> items(allocator);
+ items.reserve(num);
A alloc(allocator);
auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
- for (uint32_t i = 0; i < num_items; ++i) {
+ for (uint32_t i = 0; i < num; ++i) {
serde.deserialize(is, item_buffer.get(), 1);
items.push_back(std::move(*item_buffer));
(*item_buffer).~T();
}
if (!is.good()) throw std::runtime_error("error reading from std::istream");
- return req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items));
+ return items;
}
template<typename T, bool H, typename C, typename A>
@@ -287,17 +300,40 @@ std::pair<req_compactor<T, H, C, A>, size_t> req_compactor<T, H, C, A>::deserial
ptr += 2; // padding
uint32_t num_items;
ptr += copy_from_mem(ptr, num_items);
+ auto pair = deserialize_items(ptr, end_ptr - ptr, serde, allocator, num_items);
+ ptr += pair.second;
+ return std::pair<req_compactor, size_t>(
+ req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(pair.first)),
+ ptr - static_cast<const char*>(bytes)
+ );
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+std::pair<req_compactor<T, H, C, A>, size_t> req_compactor<T, H, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items) {
+ auto pair = deserialize_items(bytes, size, serde, allocator, num_items);
+ return std::pair<req_compactor, size_t>(
+ req_compactor(0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(pair.first)),
+ pair.second
+ );
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+std::pair<std::vector<T, A>, size_t> req_compactor<T, H, C, A>::deserialize_items(const void* bytes, size_t size, const S& serde, const A& allocator, size_t num) {
+ const char* ptr = static_cast<const char*>(bytes);
+ const char* end_ptr = static_cast<const char*>(bytes) + size;
std::vector<T, A> items(allocator);
A alloc(allocator);
auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
- for (uint32_t i = 0; i < num_items; ++i) {
+ for (uint32_t i = 0; i < num; ++i) {
ptr += serde.deserialize(ptr, end_ptr - ptr, item_buffer.get(), 1);
items.push_back(std::move(*item_buffer));
(*item_buffer).~T();
}
- return std::pair<req_compactor, size_t>(
- req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items)),
+ return std::pair<std::vector<T, A>, size_t>(
+ std::move(items),
ptr - static_cast<const char*>(bytes)
);
}
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
index 542a989..96e10c2 100755
--- a/req/include/req_sketch_impl.hpp
+++ b/req/include/req_sketch_impl.hpp
@@ -265,10 +265,10 @@ void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
);
write(os, flags_byte);
write(os, k_);
- const uint8_t num_levels = get_num_levels();
+ const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
write(os, num_levels);
- const uint8_t unused = 0;
- write(os, unused);
+ const uint8_t num_raw_items = raw_items ? n_ : 0;
+ write(os, num_raw_items);
if (is_empty()) return;
if (is_estimation_mode()) {
write(os, n_);
@@ -276,7 +276,7 @@ void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
S().serialize(os, max_value_, 1);
}
if (raw_items) {
- S().serialize(os, min_value_, 1);
+ S().serialize(os, compactors_[0].get_items().data(), num_raw_items);
} else {
for (const auto& compactor: compactors_) compactor.serialize(os, S());
}
@@ -304,10 +304,10 @@ auto req_sketch<T, H, C, S, A>::serialize(unsigned header_size_bytes) const -> v
);
ptr += copy_to_mem(flags_byte, ptr);
ptr += copy_to_mem(k_, ptr);
- const uint8_t num_levels = get_num_levels();
+ const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
ptr += copy_to_mem(num_levels, ptr);
- const uint8_t unused = 0;
- ptr += copy_to_mem(unused, ptr);
+ const uint8_t num_raw_items = raw_items ? n_ : 0;
+ ptr += copy_to_mem(num_raw_items, ptr);
if (!is_empty()) {
if (is_estimation_mode()) {
ptr += copy_to_mem(n_, ptr);
@@ -315,7 +315,7 @@ auto req_sketch<T, H, C, S, A>::serialize(unsigned header_size_bytes) const -> v
ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
}
if (raw_items) {
- ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+ ptr += S().serialize(ptr, end_ptr - ptr, compactors_[0].get_items().data(), num_raw_items);
} else {
for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, S());
}
@@ -331,9 +331,8 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
const auto flags_byte = read<uint8_t>(is);
const auto k = read<uint16_t>(is);
const auto num_levels = read<uint8_t>(is);
- read<uint8_t>(is); // unused byte
+ const auto num_raw_items = read<uint8_t>(is);
- std::cout << "flags=" << std::hex << ((int)flags_byte) << "\n";
// TODO: checks
if (!is.good()) throw std::runtime_error("error reading from std::istream");
@@ -363,35 +362,29 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
}
if (raw_items) {
- S().deserialize(is, min_value_buffer.get(), 1);
- // serde call did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
- new (max_value_buffer.get()) T(*min_value);
- // copy did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
- compactors.push_back(Compactor(1, k, allocator, min_value.get(), is_level_0_sorted));
+ compactors.push_back(Compactor::deserialize(is, S(), allocator, is_level_0_sorted, k, num_raw_items));
} else {
for (size_t i = 0; i < num_levels; ++i) {
compactors.push_back(Compactor::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true));
}
- if (num_levels == 1) {
- const auto& items = compactors[0].get_items();
- n = items.size();
- auto min_it = items.begin();
- auto max_it = items.begin();
- auto it = items.begin();
- while (it != items.end()) {
- if (C()(*it, *min_it)) min_it = it;
- if (C()(*max_it, *it)) max_it = it;
- ++it;
- }
- new (min_value_buffer.get()) T(*min_it);
- // copy did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
- new (max_value_buffer.get()) T(*max_it);
- // copy did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ }
+ if (num_levels == 1) {
+ const auto& items = compactors[0].get_items();
+ n = items.size();
+ auto min_it = items.begin();
+ auto max_it = items.begin();
+ auto it = items.begin();
+ while (it != items.end()) {
+ if (C()(*it, *min_it)) min_it = it;
+ if (C()(*max_it, *it)) max_it = it;
+ ++it;
}
+ new (min_value_buffer.get()) T(*min_it);
+ // copy did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+ new (max_value_buffer.get()) T(*max_it);
+ // copy did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
if (!is.good()) throw std::runtime_error("error reading from std::istream");
@@ -416,7 +409,8 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(const void* byt
ptr += copy_from_mem(ptr, k);
uint8_t num_levels;
ptr += copy_from_mem(ptr, num_levels);
- ++ptr; // unused byte
+ uint8_t num_raw_items;
+ ptr += copy_from_mem(ptr, num_raw_items);
// TODO: checks
// ensure memory size
@@ -447,37 +441,33 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(const void* byt
}
if (raw_items) {
- ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
- // serde call did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
- new (max_value_buffer.get()) T(*min_value);
- // copy did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
- compactors.push_back(Compactor(1, k, allocator, min_value.get(), is_level_0_sorted));
+ auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, is_level_0_sorted, k, num_raw_items);
+ compactors.push_back(std::move(pair.first));
+ ptr += pair.second;
} else {
for (size_t i = 0; i < num_levels; ++i) {
auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, i == 0 ? is_level_0_sorted : true);
compactors.push_back(std::move(pair.first));
ptr += pair.second;
}
- if (num_levels == 1) {
- const auto& items = compactors[0].get_items();
- n = items.size();
- auto min_it = items.begin();
- auto max_it = items.begin();
- auto it = items.begin();
- while (it != items.end()) {
- if (C()(*it, *min_it)) min_it = it;
- if (C()(*max_it, *it)) max_it = it;
- ++it;
- }
- new (min_value_buffer.get()) T(*min_it);
- // copy did not throw, repackage with destrtuctor
- min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
- new (max_value_buffer.get()) T(*max_it);
- // copy did not throw, repackage with destrtuctor
- max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+ }
+ if (num_levels == 1) {
+ const auto& items = compactors[0].get_items();
+ n = items.size();
+ auto min_it = items.begin();
+ auto max_it = items.begin();
+ auto it = items.begin();
+ while (it != items.end()) {
+ if (C()(*it, *min_it)) min_it = it;
+ if (C()(*max_it, *it)) max_it = it;
+ ++it;
}
+ new (min_value_buffer.get()) T(*min_it);
+ // copy did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+ new (max_value_buffer.get()) T(*max_it);
+ // copy did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
return req_sketch(k, n, std::move(min_value), std::move(max_value), std::move(compactors));
diff --git a/req/test/req_sketch_test.cpp b/req/test/req_sketch_test.cpp
index 2953a8a..e0e6c9a 100755
--- a/req/test/req_sketch_test.cpp
+++ b/req/test/req_sketch_test.cpp
@@ -301,7 +301,6 @@ TEST_CASE("req sketch: stream deserialize from Java - empty", "[req_sketch]") {
is.exceptions(std::ios::failbit | std::ios::badbit);
is.open(input_path + "req_float_empty_from_java.sk", std::ios::binary);
auto sketch = req_sketch<float, true>::deserialize(is);
- std::cout << sketch.to_string();
REQUIRE(sketch.is_empty());
REQUIRE_FALSE(sketch.is_estimation_mode());
REQUIRE(sketch.get_n() == 0);
@@ -315,13 +314,42 @@ TEST_CASE("req sketch: stream deserialize from Java - single item", "[req_sketch
is.exceptions(std::ios::failbit | std::ios::badbit);
is.open(input_path + "req_float_single_item_from_java.sk", std::ios::binary);
auto sketch = req_sketch<float, true>::deserialize(is);
- std::cout << sketch.to_string();
REQUIRE_FALSE(sketch.is_empty());
REQUIRE_FALSE(sketch.is_estimation_mode());
REQUIRE(sketch.get_n() == 1);
REQUIRE(sketch.get_num_retained() == 1);
REQUIRE(sketch.get_min_value() == 1);
REQUIRE(sketch.get_max_value() == 1);
+ REQUIRE(sketch.get_rank(1) == 0);
+ REQUIRE(sketch.get_rank<true>(1) == 1);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - raw items", "[req_sketch]") {
+ std::ifstream is;
+ is.exceptions(std::ios::failbit | std::ios::badbit);
+ is.open(input_path + "req_float_raw_items_from_java.sk", std::ios::binary);
+ auto sketch = req_sketch<float, true>::deserialize(is);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 4);
+ REQUIRE(sketch.get_num_retained() == 4);
+ REQUIRE(sketch.get_min_value() == 0);
+ REQUIRE(sketch.get_max_value() == 3);
+ REQUIRE(sketch.get_rank(2) == 0.5);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - exact mode", "[req_sketch]") {
+ std::ifstream is;
+ is.exceptions(std::ios::failbit | std::ios::badbit);
+ is.open(input_path + "req_float_exact_from_java.sk", std::ios::binary);
+ auto sketch = req_sketch<float, true>::deserialize(is);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 100);
+ REQUIRE(sketch.get_num_retained() == 100);
+ REQUIRE(sketch.get_min_value() == 0);
+ REQUIRE(sketch.get_max_value() == 99);
+ REQUIRE(sketch.get_rank(50) == 0.5);
}
TEST_CASE("req sketch: stream deserialize from Java - estimation mode", "[req_sketch]") {
@@ -329,13 +357,13 @@ TEST_CASE("req sketch: stream deserialize from Java - estimation mode", "[req_sk
is.exceptions(std::ios::failbit | std::ios::badbit);
is.open(input_path + "req_float_estimation_from_java.sk", std::ios::binary);
auto sketch = req_sketch<float, true>::deserialize(is);
- std::cout << sketch.to_string();
REQUIRE_FALSE(sketch.is_empty());
REQUIRE(sketch.is_estimation_mode());
REQUIRE(sketch.get_n() == 10000);
REQUIRE(sketch.get_num_retained() == 2942);
REQUIRE(sketch.get_min_value() == 0);
REQUIRE(sketch.get_max_value() == 9999);
+ REQUIRE(sketch.get_rank(5000) == 0.5);
}
TEST_CASE("req sketch: merge", "[req_sketch]") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org