You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/10/22 20:47:38 UTC
[incubator-datasketches-cpp] 01/01: draft,
no rank and quantile queries yet
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch req_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git
commit d46a046bfc82cb0a8d785e49331d9a6be96cf19f
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Thu Oct 22 13:47:21 2020 -0700
draft, no rank and quantile queries yet
---
CMakeLists.txt | 1 +
req/CMakeLists.txt | 48 +++++++
req/include/req_sketch.hpp | 145 +++++++++++++++++++++
req/include/req_sketch_impl.hpp | 281 ++++++++++++++++++++++++++++++++++++++++
req/test/CMakeLists.txt | 42 ++++++
req/test/req_sketch_test.cpp | 70 ++++++++++
6 files changed, 587 insertions(+)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ff1d4a5..d3c1fef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -96,6 +96,7 @@ add_subdirectory(fi)
add_subdirectory(theta)
add_subdirectory(sampling)
add_subdirectory(tuple)
+add_subdirectory(req)
if (WITH_PYTHON)
add_subdirectory(python)
diff --git a/req/CMakeLists.txt b/req/CMakeLists.txt
new file mode 100755
index 0000000..b3cfae3
--- /dev/null
+++ b/req/CMakeLists.txt
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(req INTERFACE)
+
+add_library(${PROJECT_NAME}::THETA ALIAS req)
+
+if (BUILD_TESTS)
+ add_subdirectory(test)
+endif()
+
+target_include_directories(req
+ INTERFACE
+ $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+ $<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include>
+)
+
+target_link_libraries(req INTERFACE common)
+target_compile_features(req INTERFACE cxx_std_11)
+
+set(req_HEADERS "")
+list(APPEND req_HEADERS "include/req_sketch.hpp")
+
+install(TARGETS req
+ EXPORT ${PROJECT_NAME}
+)
+
+install(FILES ${req_HEADERS}
+ DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/DataSketches")
+
+target_sources(req
+ INTERFACE
+ ${CMAKE_CURRENT_SOURCE_DIR}/include/req_sketch.hpp
+)
diff --git a/req/include/req_sketch.hpp b/req/include/req_sketch.hpp
new file mode 100755
index 0000000..53039aa
--- /dev/null
+++ b/req/include/req_sketch.hpp
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_HPP_
+#define REQ_SKETCH_HPP_
+
+#include "serde.hpp"
+#include "common_defs.hpp"
+
+namespace datasketches {
+
+// TODO: have a common random bit with KLL
+static std::independent_bits_engine<std::mt19937, 1, uint8_t> req_random_bit(std::chrono::system_clock::now().time_since_epoch().count());
+
+namespace req_constants {
+ static const uint32_t MIN_K = 4;
+ static const uint32_t INIT_NUM_SECTIONS = 3;
+}
+
+template<
+typename T,
+bool IsHighRank,
+typename Comparator,
+typename Allocator
+>
+class req_compactor {
+public:
+ req_compactor(uint8_t lg_weight, uint32_t section_size);
+
+ bool is_sorted() const;
+ uint32_t get_num_items() const;
+ uint32_t get_nom_capacity() const;
+
+ template<typename FwdT>
+ void append(FwdT&& item);
+
+ void sort();
+ void merge_sort_in(std::vector<T, Allocator>&& items);
+
+ std::vector<T, Allocator> compact();
+
+private:
+ uint8_t lg_weight_;
+ bool coin_; // random bit for compaction
+ bool sorted_;
+ double section_size_raw_;
+ uint32_t section_size_;
+ uint32_t num_sections_;
+ uint32_t num_compactions_;
+ uint32_t state_; // state of the deterministic compaction schedule
+ std::vector<T, Allocator> items_;
+
+ bool ensure_enough_sections();
+ size_t compute_compaction_range(uint32_t secs_to_compact) const;
+
+ static uint32_t nearest_even(double value);
+
+ template<typename Iter>
+ static std::vector<T, Allocator> get_evens_or_odds(Iter from, Iter to, bool flag);
+};
+
+template<
+ typename T,
+ bool IsHighRank,
+ typename Comparator = std::less<T>,
+ typename SerDe = serde<T>,
+ typename Allocator = std::allocator<T>
+>
+class req_sketch {
+public:
+ using Compactor = req_compactor<T, IsHighRank, Comparator, Allocator>;
+
+ req_sketch(uint32_t k, const Allocator& allocator = Allocator());
+
+ /**
+ * Returns true if this sketch is empty.
+ * @return empty flag
+ */
+ bool is_empty() const;
+
+ /**
+ * Returns the length of the input stream.
+ * @return stream length
+ */
+ uint64_t get_n() const;
+
+ /**
+ * Returns the number of retained items in the sketch.
+ * @return number of retained items
+ */
+ uint32_t get_num_retained() const;
+
+ /**
+ * Returns true if this sketch is in estimation mode.
+ * @return estimation mode flag
+ */
+ bool is_estimation_mode() const;
+
+ template<typename FwdT>
+ void update(FwdT&& item);
+
+ /**
+ * Prints a summary of the sketch.
+ * @param print_levels if true include information about levels
+ * @param print_items if true include sketch data
+ */
+ string<Allocator> to_string(bool print_levels = false, bool print_items = false) const;
+
+private:
+ Allocator allocator_;
+ uint32_t k_;
+ uint32_t max_nom_size_;
+ uint32_t num_retained_;
+ uint64_t n_;
+ using AllocCompactor = typename std::allocator_traits<Allocator>::template rebind_alloc<Compactor>;
+ std::vector<Compactor, AllocCompactor> compactors_;
+
+ uint8_t get_num_levels() const;
+ void grow();
+ void update_max_nom_size();
+ void update_num_retained();
+ void compress();
+};
+
+} /* namespace datasketches */
+
+#include "req_sketch_impl.hpp"
+
+#endif
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
new file mode 100755
index 0000000..30c2a86
--- /dev/null
+++ b/req/include/req_sketch_impl.hpp
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_IMPL_HPP_
+#define REQ_SKETCH_IMPL_HPP_
+
+#include <stdexcept>
+#include <cmath>
+#include <sstream>
+
+#include "count_zeros.hpp"
+
+namespace datasketches {
+
+template<typename T, bool H, typename C, typename A>
+req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size):
+lg_weight_(lg_weight),
+coin_(false),
+sorted_(true),
+section_size_raw_(section_size),
+section_size_(section_size),
+num_sections_(req_constants::INIT_NUM_SECTIONS),
+num_compactions_(0),
+state_(0)
+{}
+
+template<typename T, bool H, typename C, typename A>
+bool req_compactor<T, H, C, A>::is_sorted() const {
+ return sorted_;
+}
+
+template<typename T, bool H, typename C, typename A>
+uint32_t req_compactor<T, H, C, A>::get_num_items() const {
+ return items_.size();
+}
+
+template<typename T, bool H, typename C, typename A>
+uint32_t req_compactor<T, H, C, A>::get_nom_capacity() const {
+ return 2 * num_sections_ * section_size_;
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename FwdT>
+void req_compactor<T, H, C, A>::append(FwdT&& item) {
+ items_.push_back(std::forward<FwdT>(item));
+ sorted_ = false;
+}
+
+template<typename T, bool H, typename C, typename A>
+void req_compactor<T, H, C, A>::sort() {
+ std::sort(items_.begin(), items_.end(), C());
+ sorted_ = true;
+}
+
+template<typename T, bool H, typename C, typename A>
+void req_compactor<T, H, C, A>::merge_sort_in(std::vector<T, A>&& items) {
+ if (!sorted_) throw std::logic_error("compactor must be sorted at this point");
+ if (items_.capacity() < items_.size() + items.size()) items_.reserve(items_.size() + items.size());
+ auto middle = items_.end();
+ std::move(items.begin(), items.end(), std::back_inserter(items_));
+ std::inplace_merge(items_.begin(), middle, items_.end());
+}
+
+template<typename T, bool H, typename C, typename A>
+std::vector<T, A> req_compactor<T, H, C, A>::compact() {
+ // choose a part of the buffer to compact
+ const uint32_t secs_to_compact = std::min(static_cast<uint32_t>(count_trailing_zeros_in_u32(~state_) + 1), num_sections_);
+ const size_t compaction_range = compute_compaction_range(secs_to_compact);
+ const uint32_t compact_from = compaction_range & 0xFFFFFFFFLL; // low 32
+ const uint32_t compact_to = compaction_range >> 32; // high 32
+ if (compact_to - compact_from < 2) throw std::logic_error("compaction range error");
+
+ if ((num_compactions_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
+ else { coin_ = req_random_bit(); } // random coin flip
+
+ auto promote = get_evens_or_odds(items_.begin() + compact_from, items_.begin() + compact_to, coin_);
+ items_.erase(items_.begin() + compact_from, items_.begin() + compact_to);
+
+ ++num_compactions_;
+ ++state_;
+ ensure_enough_sections();
+ return promote;
+}
+
+template<typename T, bool H, typename C, typename A>
+bool req_compactor<T, H, C, A>::ensure_enough_sections() {
+ const double ssr = section_size_raw_ / sqrt(2);
+ const uint32_t ne = nearest_even(ssr);
+ if (num_compactions_ >= 1 << (num_sections_ - 1) && ne >= req_constants::MIN_K) {
+ section_size_raw_ = ssr;
+ section_size_ = ne;
+ num_sections_ <<= 1;
+ //ensure_capacity(2 * get_nom_capacity());
+ return true;
+ }
+ return false;
+}
+
+template<typename T, bool H, typename C, typename A>
+size_t req_compactor<T, H, C, A>::compute_compaction_range(uint32_t secs_to_compact) const {
+ const uint32_t num_items = items_.size();
+ uint32_t non_compact = get_nom_capacity() / 2 + (num_sections_ - secs_to_compact) * section_size_;
+ // make compacted region even
+ if ((num_items - non_compact & 1) == 1) ++non_compact;
+ const size_t low = H ? 0 : non_compact;
+ const size_t high = H ? num_items - non_compact : num_items;
+ return (high << 32) + low;
+}
+
+template<typename T, bool H, typename C, typename A>
+uint32_t req_compactor<T, H, C, A>::nearest_even(double value) {
+ return static_cast<uint32_t>(round(value / 2)) << 1;
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename Iter>
+std::vector<T, A> req_compactor<T, H, C, A>::get_evens_or_odds(Iter from, Iter to, bool odds) {
+ std::vector<T, A> result;
+ if (from == to) return result;
+ Iter i = from;
+ if (odds) ++i;
+ while (i != to) {
+ result.push_back(*i);
+ ++i;
+ if (i == to) break;
+ ++i;
+ }
+ return result;
+}
+
+// sketch
+
+template<typename T, bool H, typename C, typename S, typename A>
+req_sketch<T, H, C, S, A>::req_sketch(uint32_t k, const A& allocator):
+allocator_(allocator),
+k_(k),
+max_nom_size_(0),
+num_retained_(0),
+n_(0)
+{
+ grow();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+bool req_sketch<T, H, C, S, A>::is_empty() const {
+ return n_ == 0;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+uint64_t req_sketch<T, H, C, S, A>::get_n() const {
+ return n_;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+uint32_t req_sketch<T, H, C, S, A>::get_num_retained() const {
+ return num_retained_;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+bool req_sketch<T, H, C, S, A>::is_estimation_mode() const {
+ return compactors_.size() > 1;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+template<typename FwdT>
+void req_sketch<T, H, C, S, A>::update(FwdT&& item) {
+// if (Float.isNaN(item)) { return; }
+// if (isEmpty()) {
+// minValue = item;
+// maxValue = item;
+// } else {
+// if (item < minValue) { minValue = item; }
+// if (item > maxValue) { maxValue = item; }
+// }
+ compactors_[0].append(item);
+ ++num_retained_;
+ ++n_;
+ if (num_retained_ == max_nom_size_) {
+ compactors_[0].sort();
+ compress();
+ }
+ // aux = null;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::grow() {
+ const uint8_t lg_weight = get_num_levels();
+ compactors_.push_back(Compactor(lg_weight, k_));
+ update_max_nom_size();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+uint8_t req_sketch<T, H, C, S, A>::get_num_levels() const {
+ return compactors_.size();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::update_max_nom_size() {
+ max_nom_size_ = 0;
+ for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::update_num_retained() {
+ num_retained_ = 0;
+ for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::compress() {
+ for (size_t h = 0; h < compactors_.size(); ++h) {
+ if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
+ if (h + 1 >= get_num_levels()) { // at the top?
+ grow(); // add a level, increases max_nom_size
+ }
+ auto promoted = compactors_[h].compact();
+ compactors_[h + 1].merge_sort_in(std::move(promoted));
+ update_num_retained();
+ if (num_retained_ < max_nom_size_) break;
+ }
+ }
+ update_max_nom_size();
+ // aux = null;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+string<A> req_sketch<T, H, C, S, A>::to_string(bool print_levels, bool print_items) const {
+ std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
+ os << "### REQ sketch summary:" << std::endl;
+ os << " K : " << k_ << std::endl;
+ os << " N : " << n_ << std::endl;
+ os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
+ os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
+ os << " Levels : " << compactors_.size() << std::endl;
+ os << " Sorted : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
+ os << " Capacity items : " << max_nom_size_ << std::endl;
+ os << " Retained items : " << num_retained_ << std::endl;
+// os << " Storage bytes : " << get_serialized_size_bytes() << std::endl;
+// if (!is_empty()) {
+// os << " Min value : " << *min_value_ << std::endl;
+// os << " Max value : " << *max_value_ << std::endl;
+// }
+ os << "### End sketch summary" << std::endl;
+
+ if (print_levels) {
+ os << "### REQ sketch levels:" << std::endl;
+ os << " index: nominal capacity, actual size" << std::endl;
+ for (uint8_t i = 0; i < compactors_.size(); i++) {
+ os << " " << (unsigned int) i << ": "
+ << compactors_[i].get_nom_capacity() << ", "
+ << compactors_[i].get_num_items() << std::endl;
+ }
+ os << "### End sketch levels" << std::endl;
+ }
+
+ if (print_items) {
+ os << "### REQ sketch data:" << std::endl;
+ os << "### End sketch data" << std::endl;
+ }
+ return os.str();
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/test/CMakeLists.txt b/req/test/CMakeLists.txt
new file mode 100755
index 0000000..42a1509
--- /dev/null
+++ b/req/test/CMakeLists.txt
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_executable(req_test)
+
+target_link_libraries(req_test req common_test)
+
+set_target_properties(req_test PROPERTIES
+ CXX_STANDARD 11
+ CXX_STANDARD_REQUIRED YES
+)
+
+file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}" THETA_TEST_BINARY_PATH)
+string(APPEND THETA_TEST_BINARY_PATH "/")
+target_compile_definitions(req_test
+ PRIVATE
+ TEST_BINARY_INPUT_PATH="${REQ_TEST_BINARY_PATH}"
+)
+
+add_test(
+ NAME req_test
+ COMMAND req_test
+)
+
+target_sources(req_test
+ PRIVATE
+ req_sketch_test.cpp
+)
diff --git a/req/test/req_sketch_test.cpp b/req/test/req_sketch_test.cpp
new file mode 100755
index 0000000..56098b0
--- /dev/null
+++ b/req/test/req_sketch_test.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+
+#include <req_sketch.hpp>
+
+namespace datasketches {
+
+#ifdef TEST_BINARY_INPUT_PATH
+const std::string inputPath = TEST_BINARY_INPUT_PATH;
+#else
+const std::string inputPath = "test/";
+#endif
+
+TEST_CASE("req sketch: empty", "[req_sketch]") {
+ req_sketch<float, true> sketch(100);
+ REQUIRE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 0);
+ REQUIRE(sketch.get_num_retained() == 0);
+}
+
+TEST_CASE("req sketch: single value", "[req_sketch]") {
+ req_sketch<float, true> sketch(100);
+ sketch.update(1);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 1);
+ REQUIRE(sketch.get_num_retained() == 1);
+}
+
+TEST_CASE("req sketch: exact mode", "[req_sketch]") {
+ req_sketch<float, true> sketch(100);
+ for (size_t i = 0; i < 100; ++i) sketch.update(i);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE_FALSE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == 100);
+ REQUIRE(sketch.get_num_retained() == 100);
+}
+
+TEST_CASE("req sketch: estimation mode", "[req_sketch]") {
+ std::cout << "estimation mode test\n";
+ req_sketch<float, true> sketch(100);
+ const size_t n = 1250;
+ for (size_t i = 0; i < n; ++i) sketch.update(i);
+ REQUIRE_FALSE(sketch.is_empty());
+ REQUIRE(sketch.is_estimation_mode());
+ REQUIRE(sketch.get_n() == n);
+ std::cout << sketch.to_string(true, true);
+ REQUIRE(sketch.get_num_retained() < n);
+}
+
+} /* namespace datasketches */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org