You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/10/22 20:47:38 UTC

[incubator-datasketches-cpp] 01/01: draft, no rank and quantile queries yet

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch req_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit d46a046bfc82cb0a8d785e49331d9a6be96cf19f
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Thu Oct 22 13:47:21 2020 -0700

    draft, no rank and quantile queries yet
---
 CMakeLists.txt                  |   1 +
 req/CMakeLists.txt              |  48 +++++++
 req/include/req_sketch.hpp      | 145 +++++++++++++++++++++
 req/include/req_sketch_impl.hpp | 281 ++++++++++++++++++++++++++++++++++++++++
 req/test/CMakeLists.txt         |  42 ++++++
 req/test/req_sketch_test.cpp    |  70 ++++++++++
 6 files changed, 587 insertions(+)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index ff1d4a5..d3c1fef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -96,6 +96,7 @@ add_subdirectory(fi)
 add_subdirectory(theta)
 add_subdirectory(sampling)
 add_subdirectory(tuple)
+add_subdirectory(req)
 
 if (WITH_PYTHON)
   add_subdirectory(python)
diff --git a/req/CMakeLists.txt b/req/CMakeLists.txt
new file mode 100755
index 0000000..b3cfae3
--- /dev/null
+++ b/req/CMakeLists.txt
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(req INTERFACE)
+
+add_library(${PROJECT_NAME}::THETA ALIAS req)
+
+if (BUILD_TESTS)
+  add_subdirectory(test)
+endif()
+
+target_include_directories(req
+  INTERFACE
+    $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+    $<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include>
+)
+
+target_link_libraries(req INTERFACE common)
+target_compile_features(req INTERFACE cxx_std_11)
+
+set(req_HEADERS "")
+list(APPEND req_HEADERS "include/req_sketch.hpp")
+
+install(TARGETS req
+  EXPORT ${PROJECT_NAME}
+)
+
+install(FILES ${req_HEADERS}
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/DataSketches")
+
+target_sources(req
+  INTERFACE
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/req_sketch.hpp
+)
diff --git a/req/include/req_sketch.hpp b/req/include/req_sketch.hpp
new file mode 100755
index 0000000..53039aa
--- /dev/null
+++ b/req/include/req_sketch.hpp
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_HPP_
+#define REQ_SKETCH_HPP_
+
+#include "serde.hpp"
+#include "common_defs.hpp"
+
+namespace datasketches {
+
+// TODO: have a common random bit with KLL
+static std::independent_bits_engine<std::mt19937, 1, uint8_t> req_random_bit(std::chrono::system_clock::now().time_since_epoch().count());
+
+namespace req_constants {
+  static const uint32_t MIN_K = 4;
+  static const uint32_t INIT_NUM_SECTIONS = 3;
+}
+
+template<
+typename T,
+bool IsHighRank,
+typename Comparator,
+typename Allocator
+>
+class req_compactor {
+public:
+  req_compactor(uint8_t lg_weight, uint32_t section_size);
+
+  bool is_sorted() const;
+  uint32_t get_num_items() const;
+  uint32_t get_nom_capacity() const;
+
+  template<typename FwdT>
+  void append(FwdT&& item);
+
+  void sort();
+  void merge_sort_in(std::vector<T, Allocator>&& items);
+
+  std::vector<T, Allocator> compact();
+
+private:
+  uint8_t lg_weight_;
+  bool coin_; // random bit for compaction
+  bool sorted_;
+  double section_size_raw_;
+  uint32_t section_size_;
+  uint32_t num_sections_;
+  uint32_t num_compactions_;
+  uint32_t state_; // state of the deterministic compaction schedule
+  std::vector<T, Allocator> items_;
+
+  bool ensure_enough_sections();
+  size_t compute_compaction_range(uint32_t secs_to_compact) const;
+
+  static uint32_t nearest_even(double value);
+
+  template<typename Iter>
+  static std::vector<T, Allocator> get_evens_or_odds(Iter from, Iter to, bool flag);
+};
+
+template<
+  typename T,
+  bool IsHighRank,
+  typename Comparator = std::less<T>,
+  typename SerDe = serde<T>,
+  typename Allocator = std::allocator<T>
+>
+class req_sketch {
+public:
+  using Compactor = req_compactor<T, IsHighRank, Comparator, Allocator>;
+
+  req_sketch(uint32_t k, const Allocator& allocator = Allocator());
+
+  /**
+   * Returns true if this sketch is empty.
+   * @return empty flag
+   */
+  bool is_empty() const;
+
+  /**
+   * Returns the length of the input stream.
+   * @return stream length
+   */
+  uint64_t get_n() const;
+
+  /**
+   * Returns the number of retained items in the sketch.
+   * @return number of retained items
+   */
+  uint32_t get_num_retained() const;
+
+  /**
+   * Returns true if this sketch is in estimation mode.
+   * @return estimation mode flag
+   */
+  bool is_estimation_mode() const;
+
+  template<typename FwdT>
+  void update(FwdT&& item);
+
+  /**
+   * Prints a summary of the sketch.
+   * @param print_levels if true include information about levels
+   * @param print_items if true include sketch data
+   */
+  string<Allocator> to_string(bool print_levels = false, bool print_items = false) const;
+
+private:
+  Allocator allocator_;
+  uint32_t k_;
+  uint32_t max_nom_size_;
+  uint32_t num_retained_;
+  uint64_t n_;
+  using AllocCompactor = typename std::allocator_traits<Allocator>::template rebind_alloc<Compactor>;
+  std::vector<Compactor, AllocCompactor> compactors_;
+
+  uint8_t get_num_levels() const;
+  void grow();
+  void update_max_nom_size();
+  void update_num_retained();
+  void compress();
+};
+
+} /* namespace datasketches */
+
+#include "req_sketch_impl.hpp"
+
+#endif
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
new file mode 100755
index 0000000..30c2a86
--- /dev/null
+++ b/req/include/req_sketch_impl.hpp
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef REQ_SKETCH_IMPL_HPP_
+#define REQ_SKETCH_IMPL_HPP_
+
+#include <stdexcept>
+#include <cmath>
+#include <sstream>
+
+#include "count_zeros.hpp"
+
+namespace datasketches {
+
+template<typename T, bool H, typename C, typename A>
+req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size):
+lg_weight_(lg_weight),
+coin_(false),
+sorted_(true),
+section_size_raw_(section_size),
+section_size_(section_size),
+num_sections_(req_constants::INIT_NUM_SECTIONS),
+num_compactions_(0),
+state_(0)
+{}
+
+template<typename T, bool H, typename C, typename A>
+bool req_compactor<T, H, C, A>::is_sorted() const {
+  return sorted_;
+}
+
+template<typename T, bool H, typename C, typename A>
+uint32_t req_compactor<T, H, C, A>::get_num_items() const {
+  return items_.size();
+}
+
+template<typename T, bool H, typename C, typename A>
+uint32_t req_compactor<T, H, C, A>::get_nom_capacity() const {
+  return 2 * num_sections_ * section_size_;
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename FwdT>
+void req_compactor<T, H, C, A>::append(FwdT&& item) {
+  items_.push_back(std::forward<FwdT>(item));
+  sorted_ = false;
+}
+
+template<typename T, bool H, typename C, typename A>
+void req_compactor<T, H, C, A>::sort() {
+  std::sort(items_.begin(), items_.end(), C());
+  sorted_ = true;
+}
+
+template<typename T, bool H, typename C, typename A>
+void req_compactor<T, H, C, A>::merge_sort_in(std::vector<T, A>&& items) {
+  if (!sorted_) throw std::logic_error("compactor must be sorted at this point");
+  if (items_.capacity() < items_.size() + items.size()) items_.reserve(items_.size() + items.size());
+  auto middle = items_.end();
+  std::move(items.begin(), items.end(), std::back_inserter(items_));
+  std::inplace_merge(items_.begin(), middle, items_.end());
+}
+
+template<typename T, bool H, typename C, typename A>
+std::vector<T, A> req_compactor<T, H, C, A>::compact() {
+  // choose a part of the buffer to compact
+  const uint32_t secs_to_compact = std::min(static_cast<uint32_t>(count_trailing_zeros_in_u32(~state_) + 1), num_sections_);
+  const size_t compaction_range = compute_compaction_range(secs_to_compact);
+  const uint32_t compact_from = compaction_range & 0xFFFFFFFFLL; // low 32
+  const uint32_t compact_to = compaction_range >> 32; // high 32
+  if (compact_to - compact_from < 2) throw std::logic_error("compaction range error");
+
+  if ((num_compactions_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
+  else { coin_ = req_random_bit(); } // random coin flip
+
+  auto promote = get_evens_or_odds(items_.begin() + compact_from, items_.begin() + compact_to, coin_);
+  items_.erase(items_.begin() + compact_from, items_.begin() + compact_to);
+
+  ++num_compactions_;
+  ++state_;
+  ensure_enough_sections();
+  return promote;
+}
+
+template<typename T, bool H, typename C, typename A>
+bool req_compactor<T, H, C, A>::ensure_enough_sections() {
+  const double ssr = section_size_raw_ / sqrt(2);
+  const uint32_t ne = nearest_even(ssr);
+  if (num_compactions_ >= 1 << (num_sections_ - 1) && ne >= req_constants::MIN_K) {
+    section_size_raw_ = ssr;
+    section_size_ = ne;
+    num_sections_ <<= 1;
+    //ensure_capacity(2 * get_nom_capacity());
+    return true;
+  }
+  return false;
+}
+
+template<typename T, bool H, typename C, typename A>
+size_t req_compactor<T, H, C, A>::compute_compaction_range(uint32_t secs_to_compact) const {
+  const uint32_t num_items = items_.size();
+  uint32_t non_compact = get_nom_capacity() / 2 + (num_sections_ - secs_to_compact) * section_size_;
+  // make compacted region even
+  if ((num_items - non_compact & 1) == 1) ++non_compact;
+  const size_t low = H ? 0 : non_compact;
+  const size_t high = H ? num_items - non_compact : num_items;
+  return (high << 32) + low;
+}
+
+template<typename T, bool H, typename C, typename A>
+uint32_t req_compactor<T, H, C, A>::nearest_even(double value) {
+  return static_cast<uint32_t>(round(value / 2)) << 1;
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename Iter>
+std::vector<T, A> req_compactor<T, H, C, A>::get_evens_or_odds(Iter from, Iter to, bool odds) {
+  std::vector<T, A> result;
+  if (from == to) return result;
+  Iter i = from;
+  if (odds) ++i;
+  while (i != to) {
+    result.push_back(*i);
+    ++i;
+    if (i == to) break;
+    ++i;
+  }
+  return result;
+}
+
+// sketch
+
+template<typename T, bool H, typename C, typename S, typename A>
+req_sketch<T, H, C, S, A>::req_sketch(uint32_t k, const A& allocator):
+allocator_(allocator),
+k_(k),
+max_nom_size_(0),
+num_retained_(0),
+n_(0)
+{
+  grow();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+bool req_sketch<T, H, C, S, A>::is_empty() const {
+  return n_ == 0;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+uint64_t req_sketch<T, H, C, S, A>::get_n() const {
+  return n_;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+uint32_t req_sketch<T, H, C, S, A>::get_num_retained() const {
+  return num_retained_;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+bool req_sketch<T, H, C, S, A>::is_estimation_mode() const {
+  return compactors_.size() > 1;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+template<typename FwdT>
+void req_sketch<T, H, C, S, A>::update(FwdT&& item) {
+//  if (Float.isNaN(item)) { return; }
+//  if (isEmpty()) {
+//    minValue = item;
+//    maxValue = item;
+//  } else {
+//    if (item < minValue) { minValue = item; }
+//    if (item > maxValue) { maxValue = item; }
+//  }
+  compactors_[0].append(item);
+  ++num_retained_;
+  ++n_;
+  if (num_retained_ == max_nom_size_) {
+    compactors_[0].sort();
+    compress();
+  }
+  //  aux = null;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::grow() {
+  const uint8_t lg_weight = get_num_levels();
+  compactors_.push_back(Compactor(lg_weight, k_));
+  update_max_nom_size();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+uint8_t req_sketch<T, H, C, S, A>::get_num_levels() const {
+  return compactors_.size();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::update_max_nom_size() {
+  max_nom_size_ = 0;
+  for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::update_num_retained() {
+  num_retained_ = 0;
+  for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+void req_sketch<T, H, C, S, A>::compress() {
+  for (size_t h = 0; h < compactors_.size(); ++h) {
+    if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
+      if (h + 1 >= get_num_levels()) { // at the top?
+        grow(); // add a level, increases max_nom_size
+      }
+      auto promoted = compactors_[h].compact();
+      compactors_[h + 1].merge_sort_in(std::move(promoted));
+      update_num_retained();
+      if (num_retained_ < max_nom_size_) break;
+    }
+  }
+  update_max_nom_size();
+  // aux = null;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
+string<A> req_sketch<T, H, C, S, A>::to_string(bool print_levels, bool print_items) const {
+  std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
+  os << "### REQ sketch summary:" << std::endl;
+  os << "   K              : " << k_ << std::endl;
+  os << "   N              : " << n_ << std::endl;
+  os << "   Empty          : " << (is_empty() ? "true" : "false") << std::endl;
+  os << "   Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
+  os << "   Levels         : " << compactors_.size() << std::endl;
+  os << "   Sorted         : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
+  os << "   Capacity items : " << max_nom_size_ << std::endl;
+  os << "   Retained items : " << num_retained_ << std::endl;
+//  os << "   Storage bytes  : " << get_serialized_size_bytes() << std::endl;
+//  if (!is_empty()) {
+//    os << "   Min value      : " << *min_value_ << std::endl;
+//    os << "   Max value      : " << *max_value_ << std::endl;
+//  }
+  os << "### End sketch summary" << std::endl;
+
+  if (print_levels) {
+    os << "### REQ sketch levels:" << std::endl;
+    os << "   index: nominal capacity, actual size" << std::endl;
+    for (uint8_t i = 0; i < compactors_.size(); i++) {
+      os << "   " << (unsigned int) i << ": "
+        << compactors_[i].get_nom_capacity() << ", "
+        << compactors_[i].get_num_items() << std::endl;
+    }
+    os << "### End sketch levels" << std::endl;
+  }
+
+  if (print_items) {
+    os << "### REQ sketch data:" << std::endl;
+    os << "### End sketch data" << std::endl;
+  }
+  return os.str();
+}
+
+} /* namespace datasketches */
+
+#endif
diff --git a/req/test/CMakeLists.txt b/req/test/CMakeLists.txt
new file mode 100755
index 0000000..42a1509
--- /dev/null
+++ b/req/test/CMakeLists.txt
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_executable(req_test)
+
+target_link_libraries(req_test req common_test)
+
+set_target_properties(req_test PROPERTIES
+  CXX_STANDARD 11
+  CXX_STANDARD_REQUIRED YES
+)
+
+file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}" THETA_TEST_BINARY_PATH)
+string(APPEND THETA_TEST_BINARY_PATH "/")
+target_compile_definitions(req_test
+  PRIVATE
+    TEST_BINARY_INPUT_PATH="${REQ_TEST_BINARY_PATH}"
+)
+
+add_test(
+  NAME req_test
+  COMMAND req_test
+)
+
+target_sources(req_test
+  PRIVATE
+    req_sketch_test.cpp
+)
diff --git a/req/test/req_sketch_test.cpp b/req/test/req_sketch_test.cpp
new file mode 100755
index 0000000..56098b0
--- /dev/null
+++ b/req/test/req_sketch_test.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+
+#include <req_sketch.hpp>
+
+namespace datasketches {
+
+#ifdef TEST_BINARY_INPUT_PATH
+const std::string inputPath = TEST_BINARY_INPUT_PATH;
+#else
+const std::string inputPath = "test/";
+#endif
+
+TEST_CASE("req sketch: empty", "[req_sketch]") {
+  req_sketch<float, true> sketch(100);
+  REQUIRE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 0);
+  REQUIRE(sketch.get_num_retained() == 0);
+}
+
+TEST_CASE("req sketch: single value", "[req_sketch]") {
+  req_sketch<float, true> sketch(100);
+  sketch.update(1);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 1);
+  REQUIRE(sketch.get_num_retained() == 1);
+}
+
+TEST_CASE("req sketch: exact mode", "[req_sketch]") {
+  req_sketch<float, true> sketch(100);
+  for (size_t i = 0; i < 100; ++i) sketch.update(i);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 100);
+  REQUIRE(sketch.get_num_retained() == 100);
+}
+
+TEST_CASE("req sketch: estimation mode", "[req_sketch]") {
+  std::cout << "estimation mode test\n";
+  req_sketch<float, true> sketch(100);
+  const size_t n = 1250;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == n);
+  std::cout << sketch.to_string(true, true);
+  REQUIRE(sketch.get_num_retained() < n);
+}
+
+} /* namespace datasketches */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org