You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/02/24 22:43:14 UTC
[incubator-datasketches-cpp] 01/01: moved implementation to
separate files
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch moved_implementation
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git
commit e6adf27069dccb2f5e2640e08eed4ae4383f7fd7
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Feb 24 14:42:59 2020 -0800
moved implementation to separate files
---
fi/include/frequent_items_sketch.hpp | 417 +--------------------
...s_sketch.hpp => frequent_items_sketch_impl.hpp} | 75 +---
fi/include/reverse_purge_hash_map.hpp | 314 +---------------
...ash_map.hpp => reverse_purge_hash_map_impl.hpp} | 84 +----
kll/include/kll_sketch.hpp | 26 +-
kll/include/kll_sketch_impl.hpp | 28 ++
6 files changed, 41 insertions(+), 903 deletions(-)
diff --git a/fi/include/frequent_items_sketch.hpp b/fi/include/frequent_items_sketch.hpp
index ed7b70f..310e41d 100644
--- a/fi/include/frequent_items_sketch.hpp
+++ b/fi/include/frequent_items_sketch.hpp
@@ -23,8 +23,6 @@
#include <memory>
#include <vector>
#include <iostream>
-#include <streambuf>
-#include <cstring>
#include <functional>
#include "reverse_purge_hash_map.hpp"
@@ -92,421 +90,8 @@ private:
static void check_size(uint8_t lg_cur_size, uint8_t lg_max_size);
};
-// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
-template<typename T, typename H, typename E, typename S, typename A>
-const uint8_t frequent_items_sketch<T, H, E, S, A>::LG_MIN_MAP_SIZE;
-
-template<typename T, typename H, typename E, typename S, typename A>
-frequent_items_sketch<T, H, E, S, A>::frequent_items_sketch(uint8_t lg_max_map_size):
-total_weight(0),
-offset(0),
-map(frequent_items_sketch::LG_MIN_MAP_SIZE, std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE))
-{
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-frequent_items_sketch<T, H, E, S, A>::frequent_items_sketch(uint8_t lg_start_map_size, uint8_t lg_max_map_size):
-total_weight(0),
-offset(0),
-map(std::max(lg_start_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE), std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE))
-{
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::update(const T& item, uint64_t weight) {
- if (weight == 0) return;
- total_weight += weight;
- offset += map.adjust_or_insert(item, weight);
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::update(T&& item, uint64_t weight) {
- if (weight == 0) return;
- total_weight += weight;
- offset += map.adjust_or_insert(std::move(item), weight);
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::merge(const frequent_items_sketch& other) {
- if (other.is_empty()) return;
- const uint64_t merged_total_weight = total_weight + other.get_total_weight(); // for correction at the end
- for (auto &it: other.map) {
- update(it.first, it.second);
- }
- offset += other.offset;
- total_weight = merged_total_weight;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::merge(frequent_items_sketch&& other) {
- if (other.is_empty()) return;
- const uint64_t merged_total_weight = total_weight + other.get_total_weight(); // for correction at the end
- for (auto &it: other.map) {
- update(std::move(it.first), it.second);
- }
- offset += other.offset;
- total_weight = merged_total_weight;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-bool frequent_items_sketch<T, H, E, S, A>::is_empty() const {
- return map.get_num_active() == 0;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-uint32_t frequent_items_sketch<T, H, E, S, A>::get_num_active_items() const {
- return map.get_num_active();
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-uint64_t frequent_items_sketch<T, H, E, S, A>::get_total_weight() const {
- return total_weight;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-uint64_t frequent_items_sketch<T, H, E, S, A>::get_estimate(const T& item) const {
- // if item is tracked estimate = weight + offset, otherwise 0
- const uint64_t weight = map.get(item);
- if (weight > 0) return weight + offset;
- return 0;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-uint64_t frequent_items_sketch<T, H, E, S, A>::get_lower_bound(const T& item) const {
- return map.get(item);
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-uint64_t frequent_items_sketch<T, H, E, S, A>::get_upper_bound(const T& item) const {
- return map.get(item) + offset;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-uint64_t frequent_items_sketch<T, H, E, S, A>::get_maximum_error() const {
- return offset;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-double frequent_items_sketch<T, H, E, S, A>::get_epsilon() const {
- return EPSILON_FACTOR / (1 << map.get_lg_max_size());
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-double frequent_items_sketch<T, H, E, S, A>::get_epsilon(uint8_t lg_max_map_size) {
- return EPSILON_FACTOR / (1 << lg_max_map_size);
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-double frequent_items_sketch<T, H, E, S, A>::get_apriori_error(uint8_t lg_max_map_size, uint64_t estimated_total_weight) {
- return get_epsilon(lg_max_map_size) * estimated_total_weight;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-class frequent_items_sketch<T, H, E, S, A>::row {
-public:
- row(const T* item, uint64_t weight, uint64_t offset):
- item(item), weight(weight), offset(offset) {}
- const T& get_item() const { return *item; }
- uint64_t get_estimate() const { return weight + offset; }
- uint64_t get_lower_bound() const { return weight; }
- uint64_t get_upper_bound() const { return weight + offset; }
-private:
- const T* item;
- uint64_t weight;
- uint64_t offset;
-};
-
-template<typename T, typename H, typename E, typename S, typename A>
-std::vector<typename frequent_items_sketch<T, H, E, S, A>::row, typename frequent_items_sketch<T, H, E, S, A>::AllocRow>
-frequent_items_sketch<T, H, E, S, A>::get_frequent_items(frequent_items_error_type err_type, uint64_t threshold) const {
- if (threshold == USE_MAX_ERROR) {
- threshold = get_maximum_error();
- }
-
- std::vector<row, AllocRow> items;
- for (auto &it: map) {
- const uint64_t lb = it.second;
- const uint64_t ub = it.second + offset;
- if ((err_type == NO_FALSE_NEGATIVES and ub > threshold) or (err_type == NO_FALSE_POSITIVES and lb > threshold)) {
- items.push_back(row(&it.first, it.second, offset));
- }
- }
- // sort by estimate in descending order
- std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); });
- return items;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::serialize(std::ostream& os) const {
- const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
- os.write((char*)&preamble_longs, sizeof(preamble_longs));
- const uint8_t serial_version = SERIAL_VERSION;
- os.write((char*)&serial_version, sizeof(serial_version));
- const uint8_t family = FAMILY_ID;
- os.write((char*)&family, sizeof(family));
- const uint8_t lg_max_size = map.get_lg_max_size();
- os.write((char*)&lg_max_size, sizeof(lg_max_size));
- const uint8_t lg_cur_size = map.get_lg_cur_size();
- os.write((char*)&lg_cur_size, sizeof(lg_cur_size));
- const uint8_t flags_byte(
- (is_empty() ? 1 << flags::IS_EMPTY : 0)
- );
- os.write((char*)&flags_byte, sizeof(flags_byte));
- const uint16_t unused16 = 0;
- os.write((char*)&unused16, sizeof(unused16));
- if (!is_empty()) {
- const uint32_t num_items = map.get_num_active();
- os.write((char*)&num_items, sizeof(num_items));
- const uint32_t unused32 = 0;
- os.write((char*)&unused32, sizeof(unused32));
- os.write((char*)&total_weight, sizeof(total_weight));
- os.write((char*)&offset, sizeof(offset));
-
- // copy active items and their weights to use batch serialization
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
- uint64_t* weights = AllocU64().allocate(num_items);
- T* items = A().allocate(num_items);
- uint32_t i = 0;
- for (auto &it: map) {
- new (&items[i]) T(it.first);
- weights[i++] = it.second;
- }
- os.write((char*)weights, sizeof(uint64_t) * num_items);
- AllocU64().deallocate(weights, num_items);
- S().serialize(os, items, num_items);
- for (unsigned i = 0; i < num_items; i++) items[i].~T();
- A().deallocate(items, num_items);
- }
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-size_t frequent_items_sketch<T, H, E, S, A>::get_serialized_size_bytes() const {
- if (is_empty()) return PREAMBLE_LONGS_EMPTY * sizeof(uint64_t);
- size_t size = (PREAMBLE_LONGS_NONEMPTY + map.get_num_active()) * sizeof(uint64_t);
- for (auto &it: map) size += S().size_of_item(it.first);
- return size;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-vector_u8<A> frequent_items_sketch<T, H, E, S, A>::serialize(unsigned header_size_bytes) const {
- const size_t size = header_size_bytes + get_serialized_size_bytes();
- vector_u8<A> bytes(size);
- uint8_t* ptr = bytes.data() + header_size_bytes;
-
- const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
- ptr += copy_to_mem(&preamble_longs, ptr, sizeof(uint8_t));
- const uint8_t serial_version = SERIAL_VERSION;
- ptr += copy_to_mem(&serial_version, ptr, sizeof(uint8_t));
- const uint8_t family = FAMILY_ID;
- ptr += copy_to_mem(&family, ptr, sizeof(uint8_t));
- const uint8_t lg_max_size = map.get_lg_max_size();
- ptr += copy_to_mem(&lg_max_size, ptr, sizeof(uint8_t));
- const uint8_t lg_cur_size = map.get_lg_cur_size();
- ptr += copy_to_mem(&lg_cur_size, ptr, sizeof(uint8_t));
- const uint8_t flags_byte(
- (is_empty() ? 1 << flags::IS_EMPTY : 0)
- );
- ptr += copy_to_mem(&flags_byte, ptr, sizeof(uint8_t));
- const uint16_t unused16 = 0;
- ptr += copy_to_mem(&unused16, ptr, sizeof(uint16_t));
- if (!is_empty()) {
- const uint32_t num_items = map.get_num_active();
- ptr += copy_to_mem(&num_items, ptr, sizeof(uint32_t));
- const uint32_t unused32 = 0;
- ptr += copy_to_mem(&unused32, ptr, sizeof(uint32_t));
- ptr += copy_to_mem(&total_weight, ptr, sizeof(uint64_t));
- ptr += copy_to_mem(&offset, ptr, sizeof(uint64_t));
-
- // copy active items and their weights to use batch serialization
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
- uint64_t* weights = AllocU64().allocate(num_items);
- T* items = A().allocate(num_items);
- uint32_t i = 0;
- for (auto &it: map) {
- new (&items[i]) T(it.first);
- weights[i++] = it.second;
- }
- ptr += copy_to_mem(weights, ptr, sizeof(uint64_t) * num_items);
- AllocU64().deallocate(weights, num_items);
- ptr += S().serialize(ptr, items, num_items);
- for (unsigned i = 0; i < num_items; i++) items[i].~T();
- A().deallocate(items, num_items);
- }
- return bytes;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-frequent_items_sketch<T, H, E, S, A> frequent_items_sketch<T, H, E, S, A>::deserialize(std::istream& is) {
- uint8_t preamble_longs;
- is.read((char*)&preamble_longs, sizeof(preamble_longs));
- uint8_t serial_version;
- is.read((char*)&serial_version, sizeof(serial_version));
- uint8_t family_id;
- is.read((char*)&family_id, sizeof(family_id));
- uint8_t lg_max_size;
- is.read((char*)&lg_max_size, sizeof(lg_max_size));
- uint8_t lg_cur_size;
- is.read((char*)&lg_cur_size, sizeof(lg_cur_size));
- uint8_t flags_byte;
- is.read((char*)&flags_byte, sizeof(flags_byte));
- uint16_t unused16;
- is.read((char*)&unused16, sizeof(unused16));
-
- const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
-
- check_preamble_longs(preamble_longs, is_empty);
- check_serial_version(serial_version);
- check_family_id(family_id);
- check_size(lg_cur_size, lg_max_size);
-
- frequent_items_sketch<T, H, E, S, A> sketch(lg_cur_size, lg_max_size);
- if (!is_empty) {
- uint32_t num_items;
- is.read((char*)&num_items, sizeof(num_items));
- uint32_t unused32;
- is.read((char*)&unused32, sizeof(unused32));
- uint64_t total_weight;
- is.read((char*)&total_weight, sizeof(total_weight));
- uint64_t offset;
- is.read((char*)&offset, sizeof(offset));
-
- // batch deserialization with intermediate array of items and weights
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
- uint64_t* weights = AllocU64().allocate(num_items);
- is.read((char*)weights, sizeof(uint64_t) * num_items);
- T* items = A().allocate(num_items); // rely on serde to construct items
- S().deserialize(is, items, num_items);
- for (uint32_t i = 0; i < num_items; i++) {
- sketch.update(std::move(items[i]), weights[i]);
- items[i].~T();
- }
- AllocU64().deallocate(weights, num_items);
- A().deallocate(items, num_items);
-
- sketch.total_weight = total_weight;
- sketch.offset = offset;
- }
- return sketch;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-frequent_items_sketch<T, H, E, S, A> frequent_items_sketch<T, H, E, S, A>::deserialize(const void* bytes, size_t size) {
- const char* ptr = static_cast<const char*>(bytes);
- uint8_t preamble_longs;
- ptr += copy_from_mem(ptr, &preamble_longs, sizeof(uint8_t));
- uint8_t serial_version;
- ptr += copy_from_mem(ptr, &serial_version, sizeof(uint8_t));
- uint8_t family_id;
- ptr += copy_from_mem(ptr, &family_id, sizeof(uint8_t));
- uint8_t lg_max_size;
- ptr += copy_from_mem(ptr, &lg_max_size, sizeof(uint8_t));
- uint8_t lg_cur_size;
- ptr += copy_from_mem(ptr, &lg_cur_size, sizeof(uint8_t));
- uint8_t flags_byte;
- ptr += copy_from_mem(ptr, &flags_byte, sizeof(uint8_t));
- uint16_t unused16;
- ptr += copy_from_mem(ptr, &unused16, sizeof(uint16_t));
-
- const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
-
- check_preamble_longs(preamble_longs, is_empty);
- check_serial_version(serial_version);
- check_family_id(family_id);
- check_size(lg_cur_size, lg_max_size);
-
- frequent_items_sketch<T, H, E, S, A> sketch(lg_cur_size, lg_max_size);
- if (!is_empty) {
- uint32_t num_items;
- ptr += copy_from_mem(ptr, &num_items, sizeof(uint32_t));
- uint32_t unused32;
- ptr += copy_from_mem(ptr, &unused32, sizeof(uint32_t));
- uint64_t total_weight;
- ptr += copy_from_mem(ptr, &total_weight, sizeof(uint64_t));
- uint64_t offset;
- ptr += copy_from_mem(ptr, &offset, sizeof(uint64_t));
-
- // batch deserialization with intermediate array of items and weights
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
- uint64_t* weights = AllocU64().allocate(num_items);
- ptr += copy_from_mem(ptr, weights, sizeof(uint64_t) * num_items);
- T* items = A().allocate(num_items);
- ptr += S().deserialize(ptr, items, num_items);
- for (uint32_t i = 0; i < num_items; i++) {
- sketch.update(std::move(items[i]), weights[i]);
- items[i].~T();
- }
- AllocU64().deallocate(weights, num_items);
- A().deallocate(items, num_items);
-
- sketch.total_weight = total_weight;
- sketch.offset = offset;
- }
- return sketch;
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::check_preamble_longs(uint8_t preamble_longs, bool is_empty) {
- if (is_empty) {
- if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
- throw std::invalid_argument("Possible corruption: preamble longs of an empty sketch must be " + std::to_string(PREAMBLE_LONGS_EMPTY) + ": " + std::to_string(preamble_longs));
- }
- } else {
- if (preamble_longs != PREAMBLE_LONGS_NONEMPTY) {
- throw std::invalid_argument("Possible corruption: preamble longs of an non-empty sketch must be " + std::to_string(PREAMBLE_LONGS_NONEMPTY) + ": " + std::to_string(preamble_longs));
- }
- }
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::check_serial_version(uint8_t serial_version) {
- if (serial_version != SERIAL_VERSION) {
- throw std::invalid_argument("Possible corruption: serial version must be " + std::to_string(SERIAL_VERSION) + ": " + std::to_string(serial_version));
- }
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::check_family_id(uint8_t family_id) {
- if (family_id != FAMILY_ID) {
- throw std::invalid_argument("Possible corruption: family ID must be " + std::to_string(FAMILY_ID) + ": " + std::to_string(family_id));
- }
-}
-
-template<typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::check_size(uint8_t lg_cur_size, uint8_t lg_max_size) {
- if (lg_cur_size > lg_max_size) {
- throw std::invalid_argument("Possible corruption: expected lg_cur_size <= lg_max_size: " + std::to_string(lg_cur_size) + " <= " + std::to_string(lg_max_size));
- }
- if (lg_cur_size < LG_MIN_MAP_SIZE) {
- throw std::invalid_argument("Possible corruption: lg_cur_size must not be less than " + std::to_string(LG_MIN_MAP_SIZE) + ": " + std::to_string(lg_cur_size));
- }
-}
-
-template <typename T, typename H, typename E, typename S, typename A>
-void frequent_items_sketch<T, H, E, S, A>::to_stream(std::ostream& os, bool print_items) const {
- os << "### Frequent items sketch summary:" << std::endl;
- os << " lg cur map size : " << (int) map.get_lg_cur_size() << std::endl;
- os << " lg max map size : " << (int) map.get_lg_max_size() << std::endl;
- os << " num active items : " << get_num_active_items() << std::endl;
- os << " total weight : " << get_total_weight() << std::endl;
- os << " max error : " << get_maximum_error() << std::endl;
- os << "### End sketch summary" << std::endl;
- if (print_items) {
- std::vector<row, AllocRow> items;
- for (auto &it: map) {
- items.push_back(row(&it.first, it.second, offset));
- }
- // sort by estimate in descending order
- std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); });
- os << "### Items in descending order by estimate" << std::endl;
- os << " item, estimate, lower bound, upper bound" << std::endl;
- for (auto &it: items) {
- os << " " << it.get_item() << ", " << it.get_estimate() << ", "
- << it.get_lower_bound() << ", " << it.get_upper_bound() << std::endl;
- }
- os << "### End items" << std::endl;
- }
}
-} /* namespace datasketches */
+#include "frequent_items_sketch_impl.hpp"
# endif
diff --git a/fi/include/frequent_items_sketch.hpp b/fi/include/frequent_items_sketch_impl.hpp
similarity index 85%
copy from fi/include/frequent_items_sketch.hpp
copy to fi/include/frequent_items_sketch_impl.hpp
index ed7b70f..5446bc8 100644
--- a/fi/include/frequent_items_sketch.hpp
+++ b/fi/include/frequent_items_sketch_impl.hpp
@@ -17,81 +17,14 @@
* under the License.
*/
-#ifndef FREQUENT_ITEMS_SKETCH_HPP_
-#define FREQUENT_ITEMS_SKETCH_HPP_
+#ifndef FREQUENT_ITEMS_SKETCH_IMPL_HPP_
+#define FREQUENT_ITEMS_SKETCH_IMPL_HPP_
-#include <memory>
-#include <vector>
-#include <iostream>
#include <streambuf>
#include <cstring>
-#include <functional>
-
-#include "reverse_purge_hash_map.hpp"
-#include "serde.hpp"
namespace datasketches {
-/*
- * Based on Java implementation here:
- * https://github.com/DataSketches/sketches-core/blob/master/src/main/java/com/yahoo/sketches/frequencies/ItemsSketch.java
- * author Alexander Saydakov
- */
-
-enum frequent_items_error_type { NO_FALSE_POSITIVES, NO_FALSE_NEGATIVES };
-
-// for serialization as raw bytes
-template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
-template<typename A> using vector_u8 = std::vector<uint8_t, AllocU8<A>>;
-
-template<typename T, typename H = std::hash<T>, typename E = std::equal_to<T>, typename S = serde<T>, typename A = std::allocator<T>>
-class frequent_items_sketch {
-public:
- static const uint64_t USE_MAX_ERROR = 0; // used in get_frequent_items
-
- explicit frequent_items_sketch(uint8_t lg_max_map_size);
- frequent_items_sketch(uint8_t lg_start_map_size, uint8_t lg_max_map_size);
- class row;
- void update(const T& item, uint64_t weight = 1);
- void update(T&& item, uint64_t weight = 1);
- void merge(const frequent_items_sketch& other);
- void merge(frequent_items_sketch&& other);
- bool is_empty() const;
- uint32_t get_num_active_items() const;
- uint64_t get_total_weight() const;
- uint64_t get_estimate(const T& item) const;
- uint64_t get_lower_bound(const T& item) const;
- uint64_t get_upper_bound(const T& item) const;
- uint64_t get_maximum_error() const;
- double get_epsilon() const;
- static double get_epsilon(uint8_t lg_max_map_size);
- static double get_apriori_error(uint8_t lg_max_map_size, uint64_t estimated_total_weight);
- typedef typename std::allocator_traits<A>::template rebind_alloc<row> AllocRow;
- std::vector<row, AllocRow> get_frequent_items(frequent_items_error_type err_type, uint64_t threshold = USE_MAX_ERROR) const;
- size_t get_serialized_size_bytes() const;
- void serialize(std::ostream& os) const;
- typedef vector_u8<A> vector_bytes; // alias for users
- vector_bytes serialize(unsigned header_size_bytes = 0) const;
- static frequent_items_sketch deserialize(std::istream& is);
- static frequent_items_sketch deserialize(const void* bytes, size_t size);
- void to_stream(std::ostream& os, bool print_items = false) const;
-private:
- static const uint8_t LG_MIN_MAP_SIZE = 3;
- static const uint8_t SERIAL_VERSION = 1;
- static const uint8_t FAMILY_ID = 10;
- static const uint8_t PREAMBLE_LONGS_EMPTY = 1;
- static const uint8_t PREAMBLE_LONGS_NONEMPTY = 4;
- static constexpr double EPSILON_FACTOR = 3.5;
- enum flags { IS_EMPTY };
- uint64_t total_weight;
- uint64_t offset;
- reverse_purge_hash_map<T, H, E, A> map;
- static void check_preamble_longs(uint8_t preamble_longs, bool is_empty);
- static void check_serial_version(uint8_t serial_version);
- static void check_family_id(uint8_t family_id);
- static void check_size(uint8_t lg_cur_size, uint8_t lg_max_size);
-};
-
// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
template<typename T, typename H, typename E, typename S, typename A>
const uint8_t frequent_items_sketch<T, H, E, S, A>::LG_MIN_MAP_SIZE;
@@ -507,6 +440,6 @@ void frequent_items_sketch<T, H, E, S, A>::to_stream(std::ostream& os, bool prin
}
}
-} /* namespace datasketches */
+}
-# endif
+#endif
diff --git a/fi/include/reverse_purge_hash_map.hpp b/fi/include/reverse_purge_hash_map.hpp
index f91c67e..d08d260 100644
--- a/fi/include/reverse_purge_hash_map.hpp
+++ b/fi/include/reverse_purge_hash_map.hpp
@@ -21,13 +21,7 @@
#define REVERSE_PURGE_HASH_MAP_HPP_
#include <memory>
-#include <algorithm>
#include <iterator>
-#include <cmath>
-
-#if defined(_MSC_VER)
-#include <iso646.h> // for and/or keywords
-#endif // _MSC_VER
namespace datasketches {
@@ -80,10 +74,6 @@ private:
uint64_t purge();
};
-// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
-template<typename T, typename H, typename E, typename A>
-constexpr uint32_t reverse_purge_hash_map<T, H, E, A>::MAX_SAMPLE_SIZE;
-
// This iterator uses strides based on golden ratio to avoid clustering during merge
template<typename T, typename H, typename E, typename A>
class reverse_purge_hash_map<T, H, E, A>::iterator: public std::iterator<std::input_iterator_tag, T> {
@@ -115,308 +105,8 @@ private:
map(map), index(index), count(count), stride(static_cast<uint32_t>((1 << map->lg_cur_size) * GOLDEN_RATIO_RECIPROCAL) | 1) {}
};
-template<typename T, typename H, typename E, typename A>
-reverse_purge_hash_map<T, H, E, A>::reverse_purge_hash_map(uint8_t lg_cur_size, uint8_t lg_max_size):
-lg_cur_size(lg_cur_size),
-lg_max_size(lg_max_size),
-num_active(0),
-keys(A().allocate(1 << lg_cur_size)),
-values(AllocU64().allocate(1 << lg_cur_size)),
-states(AllocU16().allocate(1 << lg_cur_size))
-{
- std::fill(states, &states[1 << lg_cur_size], 0);
-}
-
-template<typename T, typename H, typename E, typename A>
-reverse_purge_hash_map<T, H, E, A>::reverse_purge_hash_map(const reverse_purge_hash_map<T, H, E, A>& other):
-lg_cur_size(other.lg_cur_size),
-lg_max_size(other.lg_max_size),
-num_active(other.num_active),
-keys(A().allocate(1 << lg_cur_size)),
-values(AllocU64().allocate(1 << lg_cur_size)),
-states(AllocU16().allocate(1 << lg_cur_size))
-{
- const uint32_t size = 1 << lg_cur_size;
- if (num_active > 0) {
- auto num = num_active;
- for (uint32_t i = 0; i < size; i++) {
- if (other.states[i] > 0) {
- new (&keys[i]) T(other.keys[i]);
- values[i] = other.values[i];
- }
- if (--num == 0) break;
- }
- }
- std::copy(&other.states[0], &other.states[size], states);
-}
-
-template<typename T, typename H, typename E, typename A>
-reverse_purge_hash_map<T, H, E, A>::reverse_purge_hash_map(reverse_purge_hash_map<T, H, E, A>&& other) noexcept:
-lg_cur_size(other.lg_cur_size),
-lg_max_size(other.lg_max_size),
-num_active(other.num_active),
-keys(nullptr),
-values(nullptr),
-states(nullptr)
-{
- std::swap(keys, other.keys);
- std::swap(values, other.values);
- std::swap(states, other.states);
- other.num_active = 0;
-}
-
-template<typename T, typename H, typename E, typename A>
-reverse_purge_hash_map<T, H, E, A>::~reverse_purge_hash_map() {
- const uint32_t size = 1 << lg_cur_size;
- if (num_active > 0) {
- for (uint32_t i = 0; i < size; i++) {
- if (is_active(i)) keys[i].~T();
- if (--num_active == 0) break;
- }
- }
- A().deallocate(keys, size);
- AllocU64().deallocate(values, size);
- AllocU16().deallocate(states, size);
-}
-
-template<typename T, typename H, typename E, typename A>
-reverse_purge_hash_map<T, H, E, A>& reverse_purge_hash_map<T, H, E, A>::operator=(reverse_purge_hash_map<T, H, E, A> other) {
- std::swap(lg_cur_size, other.lg_cur_size);
- std::swap(lg_max_size, other.lg_max_size);
- std::swap(num_active, other.num_active);
- std::swap(keys, other.keys);
- std::swap(values, other.values);
- std::swap(states, other.states);
- return *this;
-}
-
-template<typename T, typename H, typename E, typename A>
-reverse_purge_hash_map<T, H, E, A>& reverse_purge_hash_map<T, H, E, A>::operator=(reverse_purge_hash_map<T, H, E, A>&& other) {
- std::swap(lg_cur_size, other.lg_cur_size);
- std::swap(lg_max_size, other.lg_max_size);
- std::swap(num_active, other.num_active);
- std::swap(keys, other.keys);
- std::swap(values, other.values);
- std::swap(states, other.states);
- return *this;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint64_t reverse_purge_hash_map<T, H, E, A>::adjust_or_insert(const T& key, uint64_t value) {
- const uint32_t num_active_before = num_active;
- const uint32_t index = internal_adjust_or_insert(key, value);
- if (num_active > num_active_before) {
- new (&keys[index]) T(key);
- return resize_or_purge_if_needed();
- }
- return 0;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint64_t reverse_purge_hash_map<T, H, E, A>::adjust_or_insert(T&& key, uint64_t value) {
- const uint32_t num_active_before = num_active;
- const uint32_t index = internal_adjust_or_insert(key, value);
- if (num_active > num_active_before) {
- new (&keys[index]) T(std::move(key));
- return resize_or_purge_if_needed();
- }
- return 0;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint64_t reverse_purge_hash_map<T, H, E, A>::get(const T& key) const {
- const uint32_t mask = (1 << lg_cur_size) - 1;
- uint32_t probe = H()(key) & mask;
- while (is_active(probe)) {
- if (E()(keys[probe], key)) return values[probe];
- probe = (probe + 1) & mask;
- }
- return 0;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint8_t reverse_purge_hash_map<T, H, E, A>::get_lg_cur_size() const {
- return lg_cur_size;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint8_t reverse_purge_hash_map<T, H, E, A>::get_lg_max_size() const {
- return lg_max_size;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint32_t reverse_purge_hash_map<T, H, E, A>::get_capacity() const {
- return (1 << lg_cur_size) * LOAD_FACTOR;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint32_t reverse_purge_hash_map<T, H, E, A>::get_num_active() const {
- return num_active;
-}
-
-template<typename T, typename H, typename E, typename A>
-typename reverse_purge_hash_map<T, H, E, A>::iterator reverse_purge_hash_map<T, H, E, A>::begin() const {
- const uint32_t size = 1 << lg_cur_size;
- uint32_t i = 0;
- while (i < size and !is_active(i)) i++;
- return reverse_purge_hash_map<T, H, E, A>::iterator(this, i, 0);
-}
-
-template<typename T, typename H, typename E, typename A>
-typename reverse_purge_hash_map<T, H, E, A>::iterator reverse_purge_hash_map<T, H, E, A>::end() const {
- return reverse_purge_hash_map<T, H, E, A>::iterator(this, 1 << lg_cur_size, num_active);
-}
-
-template<typename T, typename H, typename E, typename A>
-bool reverse_purge_hash_map<T, H, E, A>::is_active(uint32_t index) const {
- return states[index] > 0;
-}
-
-template<typename T, typename H, typename E, typename A>
-void reverse_purge_hash_map<T, H, E, A>::subtract_and_keep_positive_only(uint64_t amount) {
- // starting from the back, find the first empty cell,
- // which establishes the high end of a cluster.
- uint32_t first_probe = (1 << lg_cur_size) - 1;
- while (is_active(first_probe)) first_probe--;
- // when we find the next non-empty cell, we know we are at the high end of a cluster
- // work towards the front, delete any non-positive entries.
- for (uint32_t probe = first_probe; probe-- > 0;) {
- if (is_active(probe)) {
- if (values[probe] <= amount) {
- hash_delete(probe); // does the work of deletion and moving higher items towards the front
- num_active--;
- } else {
- values[probe] -= amount;
- }
- }
- }
- // now work on the first cluster that was skipped
- for (uint32_t probe = (1 << lg_cur_size); probe-- > first_probe;) {
- if (is_active(probe)) {
- if (values[probe] <= amount) {
- hash_delete(probe);
- num_active--;
- } else {
- values[probe] -= amount;
- }
- }
- }
-}
-
-template<typename T, typename H, typename E, typename A>
-void reverse_purge_hash_map<T, H, E, A>::hash_delete(uint32_t delete_index) {
- // Looks ahead in the table to search for another
- // item to move to this location
- // if none are found, the status is changed
- states[delete_index] = 0; // mark as empty
- keys[delete_index].~T();
- uint32_t drift = 1;
- const uint32_t mask = (1 << lg_cur_size) - 1;
- uint32_t probe = (delete_index + drift) & mask; // map length must be a power of 2
- // advance until we find a free location replacing locations as needed
- while (is_active(probe)) {
- if (states[probe] > drift) {
- // move current element
- new (&keys[delete_index]) T(std::move(keys[probe]));
- values[delete_index] = values[probe];
- states[delete_index] = states[probe] - drift;
- states[probe] = 0; // mark as empty
- keys[probe].~T();
- drift = 0;
- delete_index = probe;
- }
- probe = (probe + 1) & mask;
- drift++;
- // only used for theoretical analysis
- if (drift >= DRIFT_LIMIT) throw std::logic_error("drift: " + std::to_string(drift) + " >= DRIFT_LIMIT");
- }
-}
-
-template<typename T, typename H, typename E, typename A>
-uint32_t reverse_purge_hash_map<T, H, E, A>::internal_adjust_or_insert(const T& key, uint64_t value) {
- const uint32_t mask = (1 << lg_cur_size) - 1;
- uint32_t index = H()(key) & mask;
- uint16_t drift = 1;
- while (is_active(index)) {
- if (E()(keys[index], key)) {
- // adjusting the value of an existing key
- values[index] += value;
- return index;
- }
- index = (index + 1) & mask;
- drift++;
- // only used for theoretical analysis
- if (drift >= DRIFT_LIMIT) throw std::logic_error("drift limit reached");
- }
- // adding the key and value to the table
- if (num_active > get_capacity()) {
- throw std::logic_error("num_active " + std::to_string(num_active) + " > capacity " + std::to_string(get_capacity()));
- }
- values[index] = value;
- states[index] = drift;
- num_active++;
- return index;
-}
-
-template<typename T, typename H, typename E, typename A>
-uint64_t reverse_purge_hash_map<T, H, E, A>::resize_or_purge_if_needed() {
- if (num_active > get_capacity()) {
- if (lg_cur_size < lg_max_size) { // can grow
- resize(lg_cur_size + 1);
- } else { // at target size, must purge
- const uint64_t offset = purge();
- if (num_active > get_capacity()) {
- throw std::logic_error("purge did not reduce number of active items");
- }
- return offset;
- }
- }
- return 0;
-}
-
-template<typename T, typename H, typename E, typename A>
-void reverse_purge_hash_map<T, H, E, A>::resize(uint8_t lg_new_size) {
- const uint32_t old_size = 1 << lg_cur_size;
- T* old_keys = keys;
- uint64_t* old_values = values;
- uint16_t* old_states = states;
- const uint32_t new_size = 1 << lg_new_size;
- keys = A().allocate(new_size);
- values = AllocU64().allocate(new_size);
- states = AllocU16().allocate(new_size);
- std::fill(states, &states[new_size], 0);
- num_active = 0;
- lg_cur_size = lg_new_size;
- for (uint32_t i = 0; i < old_size; i++) {
- if (old_states[i] > 0) {
- adjust_or_insert(std::move(old_keys[i]), old_values[i]);
- old_keys[i].~T();
- }
- }
- A().deallocate(old_keys, old_size);
- AllocU64().deallocate(old_values, old_size);
- AllocU16().deallocate(old_states, old_size);
-}
-
-template<typename T, typename H, typename E, typename A>
-uint64_t reverse_purge_hash_map<T, H, E, A>::purge() {
- const uint32_t limit = std::min(MAX_SAMPLE_SIZE, num_active);
- uint32_t num_samples = 0;
- uint32_t i = 0;
- uint64_t* samples = AllocU64().allocate(limit);
- while (num_samples < limit) {
- if (is_active(i)) {
- samples[num_samples++] = values[i];
- }
- i++;
- }
- std::nth_element(&samples[0], &samples[num_samples / 2], &samples[num_samples - 1]);
- const uint64_t median = samples[num_samples / 2];
- AllocU64().deallocate(samples, limit);
- subtract_and_keep_positive_only(median);
- return median;
-}
-
} /* namespace datasketches */
+#include "reverse_purge_hash_map_impl.hpp"
+
# endif
diff --git a/fi/include/reverse_purge_hash_map.hpp b/fi/include/reverse_purge_hash_map_impl.hpp
similarity index 77%
copy from fi/include/reverse_purge_hash_map.hpp
copy to fi/include/reverse_purge_hash_map_impl.hpp
index f91c67e..4fd8af7 100644
--- a/fi/include/reverse_purge_hash_map.hpp
+++ b/fi/include/reverse_purge_hash_map_impl.hpp
@@ -17,8 +17,8 @@
* under the License.
*/
-#ifndef REVERSE_PURGE_HASH_MAP_HPP_
-#define REVERSE_PURGE_HASH_MAP_HPP_
+#ifndef REVERSE_PURGE_HASH_MAP_IMPL_HPP_
+#define REVERSE_PURGE_HASH_MAP_IMPL_HPP_
#include <memory>
#include <algorithm>
@@ -31,90 +31,10 @@
namespace datasketches {
-/*
- * Based on Java implementation here:
- * https://github.com/DataSketches/sketches-core/blob/master/src/main/java/com/yahoo/sketches/frequencies/ReversePurgeItemHashMap.java
- * author Alexander Saydakov
- */
-
-template<typename T, typename H = std::hash<T>, typename E = std::equal_to<T>, typename A = std::allocator<T>>
-class reverse_purge_hash_map {
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint16_t> AllocU16;
- typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
-
-public:
- reverse_purge_hash_map(uint8_t lg_size, uint8_t lg_max_size);
- reverse_purge_hash_map(const reverse_purge_hash_map& other);
- reverse_purge_hash_map(reverse_purge_hash_map&& other) noexcept;
- ~reverse_purge_hash_map();
- reverse_purge_hash_map& operator=(reverse_purge_hash_map other);
- reverse_purge_hash_map& operator=(reverse_purge_hash_map&& other);
- uint64_t adjust_or_insert(const T& key, uint64_t value);
- uint64_t adjust_or_insert(T&& key, uint64_t value);
- uint64_t get(const T& key) const;
- uint8_t get_lg_cur_size() const;
- uint8_t get_lg_max_size() const;
- uint32_t get_capacity() const;
- uint32_t get_num_active() const;
- class iterator;
- iterator begin() const;
- iterator end() const;
-private:
- static constexpr double LOAD_FACTOR = 0.75;
- static constexpr uint16_t DRIFT_LIMIT = 1024; // used only for stress testing
- static constexpr uint32_t MAX_SAMPLE_SIZE = 1024; // number of samples to compute approximate median during purge
-
- uint8_t lg_cur_size;
- uint8_t lg_max_size;
- uint32_t num_active;
- T* keys;
- uint64_t* values;
- uint16_t* states;
-
- inline bool is_active(uint32_t probe) const;
- void subtract_and_keep_positive_only(uint64_t amount);
- void hash_delete(uint32_t probe);
- uint32_t internal_adjust_or_insert(const T& key, uint64_t value);
- uint64_t resize_or_purge_if_needed();
- void resize(uint8_t lg_new_size);
- uint64_t purge();
-};
-
// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
template<typename T, typename H, typename E, typename A>
constexpr uint32_t reverse_purge_hash_map<T, H, E, A>::MAX_SAMPLE_SIZE;
-// This iterator uses strides based on golden ratio to avoid clustering during merge
-template<typename T, typename H, typename E, typename A>
-class reverse_purge_hash_map<T, H, E, A>::iterator: public std::iterator<std::input_iterator_tag, T> {
-public:
- friend class reverse_purge_hash_map<T, H, E, A>;
- iterator& operator++() {
- ++count;
- if (count < map->num_active) {
- const uint32_t mask = (1 << map->lg_cur_size) - 1;
- do {
- index = (index + stride) & mask;
- } while (!map->is_active(index));
- }
- return *this;
- }
- iterator operator++(int) { iterator tmp(*this); operator++(); return tmp; }
- bool operator==(const iterator& rhs) const { return count == rhs.count; }
- bool operator!=(const iterator& rhs) const { return count != rhs.count; }
- const std::pair<T&, uint64_t> operator*() const {
- return std::pair<T&, uint64_t>(map->keys[index], map->values[index]);
- }
-private:
- static constexpr double GOLDEN_RATIO_RECIPROCAL = 0.6180339887498949; // = (sqrt(5) - 1) / 2
- const reverse_purge_hash_map<T, H, E, A>* map;
- uint32_t index;
- uint32_t count;
- uint32_t stride;
- iterator(const reverse_purge_hash_map<T, H, E, A>* map, uint32_t index, uint32_t count):
- map(map), index(index), count(count), stride(static_cast<uint32_t>((1 << map->lg_cur_size) * GOLDEN_RATIO_RECIPROCAL) | 1) {}
-};
-
template<typename T, typename H, typename E, typename A>
reverse_purge_hash_map<T, H, E, A>::reverse_purge_hash_map(uint8_t lg_cur_size, uint8_t lg_max_size):
lg_cur_size(lg_cur_size),
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index d33c945..f46f335 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -183,31 +183,13 @@ class kll_sketch {
vector_d<A> get_CDF(const T* split_points, uint32_t size) const;
double get_normalized_rank_error(bool pmf) const;
- // implementation for fixed-size arithmetic types (integral and floating point)
+ // version for fixed-size arithmetic types (integral and floating point)
template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
- size_t get_serialized_size_bytes() const {
- if (is_empty()) { return EMPTY_SIZE_BYTES; }
- if (num_levels_ == 1 and get_num_retained() == 1) {
- return DATA_START_SINGLE_ITEM + sizeof(TT);
- }
- // the last integer in the levels_ array is not serialized because it can be derived
- return DATA_START + num_levels_ * sizeof(uint32_t) + (get_num_retained() + 2) * sizeof(TT);
- }
+ size_t get_serialized_size_bytes() const;
- // implementation for all other types
+ // version for all other types
template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
- size_t get_serialized_size_bytes() const {
- if (is_empty()) { return EMPTY_SIZE_BYTES; }
- if (num_levels_ == 1 and get_num_retained() == 1) {
- return DATA_START_SINGLE_ITEM + S().size_of_item(items_[levels_[0]]);
- }
- // the last integer in the levels_ array is not serialized because it can be derived
- size_t size = DATA_START + num_levels_ * sizeof(uint32_t);
- size += S().size_of_item(*min_value_);
- size += S().size_of_item(*max_value_);
- for (auto& it: *this) size += S().size_of_item(it.first);
- return size;
- }
+ size_t get_serialized_size_bytes() const;
void serialize(std::ostream& os) const;
typedef vector_u8<A> vector_bytes; // alias for users
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index 55a97f6..9ead31a 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -338,6 +338,34 @@ double kll_sketch<T, C, S, A>::get_normalized_rank_error(bool pmf) const {
return get_normalized_rank_error(min_k_, pmf);
}
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+ if (is_empty()) { return EMPTY_SIZE_BYTES; }
+ if (num_levels_ == 1 and get_num_retained() == 1) {
+ return DATA_START_SINGLE_ITEM + sizeof(TT);
+ }
+ // the last integer in the levels_ array is not serialized because it can be derived
+ return DATA_START + num_levels_ * sizeof(uint32_t) + (get_num_retained() + 2) * sizeof(TT);
+}
+
+// implementation for all other types
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+ if (is_empty()) { return EMPTY_SIZE_BYTES; }
+ if (num_levels_ == 1 and get_num_retained() == 1) {
+ return DATA_START_SINGLE_ITEM + S().size_of_item(items_[levels_[0]]);
+ }
+ // the last integer in the levels_ array is not serialized because it can be derived
+ size_t size = DATA_START + num_levels_ * sizeof(uint32_t);
+ size += S().size_of_item(*min_value_);
+ size += S().size_of_item(*max_value_);
+ for (auto& it: *this) size += S().size_of_item(it.first);
+ return size;
+}
+
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
const bool is_single_item = n_ == 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org