You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/10/12 16:43:22 UTC

incubator-quickstep git commit: Added ProbabilityStore class

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master f820c45ee -> 8f094a1c0


Added ProbabilityStore class

- Used to store probabilities of objects.
- Probabilities are of two kinds: Individual and cumulative.
- All the individual probabilities within the store add up to one.
- Support for finding the object with given cumulative probability.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8f094a1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8f094a1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8f094a1c

Branch: refs/heads/master
Commit: 8f094a1c086445b79d6dba36f81326ac06050209
Parents: f820c45
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Sep 29 15:38:42 2017 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Oct 11 10:38:36 2017 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  13 +
 query_execution/ProbabilityStore.hpp            | 263 +++++++++++++++++++
 .../tests/ProbabilityStore_unittest.cpp         | 106 ++++++++
 3 files changed, 382 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f094a1c/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8f797f7..791434a 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -40,6 +40,7 @@ if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp)
 endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
+add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
             ${queryexecution_QueryContext_proto_srcs}
@@ -201,6 +202,9 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
                       quickstep_utility_Macros
                       tmb
                       ${GFLAGS_LIB_NAME})
+target_link_libraries(quickstep_queryexecution_ProbabilityStore
+                      glog
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_QueryContext
                       glog
                       quickstep_catalog_CatalogDatabaseLite
@@ -372,6 +376,7 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_ForemanSingleNode
                       quickstep_queryexecution_PolicyEnforcerBase
                       quickstep_queryexecution_PolicyEnforcerSingleNode
+                      quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryexecution_QueryExecutionMessages_proto
@@ -425,6 +430,14 @@ if (ENABLE_DISTRIBUTED)
   add_test(BlockLocator_unittest BlockLocator_unittest)
 endif(ENABLE_DISTRIBUTED)
 
+add_executable(ProbabilityStore_unittest
+        "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
+target_link_libraries(ProbabilityStore_unittest
+                      gtest
+                      gtest_main
+                      quickstep_queryexecution_ProbabilityStore)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
+
 add_executable(QueryManagerSingleNode_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")
 target_link_libraries(QueryManagerSingleNode_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f094a1c/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
new file mode 100644
index 0000000..079f60b
--- /dev/null
+++ b/query_execution/ProbabilityStore.hpp
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <random>
+#include <unordered_map>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief A class that stores the probabilities of objects. We use an integer field
+ *        called "key" to identify each object.
+ *        A probability is expressed as a fraction. All the objects share a common denominator.
+ **/
+class ProbabilityStore {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  ProbabilityStore()
+      : common_denominator_(1.0), dist_(0.0, 1.0), engine_(std::random_device()()) {}
+
+  /**
+   * @brief Get the number of objects in the store.
+   **/
+  inline std::size_t getNumObjects() const {
+    DCHECK_EQ(individual_probabilities_.size(), cumulative_probabilities_.size());
+    return individual_probabilities_.size();
+  }
+
+  /**
+   * @brief Get the common denominator.
+   */
+  inline std::size_t getDenominator() const {
+    return common_denominator_;
+  }
+
+  /**
+   * @brief Check if an object with a given key is present.
+   * @param key The key of the given object.
+   * @return True if the object is present, false otherwise.
+   */
+  inline bool hasObject(const std::size_t key) const {
+    return (individual_probabilities_.find(key) != individual_probabilities_.end());
+  }
+
+  /**
+   * @brief Add individual (not cumulative) probability for a given object with
+   *        updated denominator.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   *
+   * @param key The key of the given object.
+   * @param numerator The numerator for the given object.
+   * @param new_denominator The updated denominator for the store.
+   **/
+  void addOrUpdateObjectNewDenominator(const std::size_t key,
+                                       const float numerator,
+                                       const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    DCHECK_LE(numerator, new_denominator);
+    common_denominator_ = new_denominator;
+    addOrUpdateObjectHelper(key, numerator);
+  }
+
+  /**
+   * @brief Add or update multiple objects with a new denominator.
+   * @param keys The keys to be added/updated.
+   * @param numerators The numerators to be added/updated.
+   * @param new_denominator The new denominator.
+   */
+  void addOrUpdateObjectsNewDenominator(
+      const std::vector<std::size_t> &keys,
+      const std::vector<float> &numerators,
+      const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    common_denominator_ = new_denominator;
+    addOrUpdateObjectsHelper(keys, numerators);
+  }
+
+  /**
+   * @brief Remove an object from the store.
+   *
+   * @note This function decrements the denominator with the value of the numerator being removed.
+   *
+   * @param key The key of the object to be removed.
+   **/
+  void removeObject(const std::size_t key) {
+    auto individual_it = individual_probabilities_.find(key);
+    DCHECK(individual_it != individual_probabilities_.end());
+    const float new_denominator = common_denominator_ - individual_it->second;
+    individual_probabilities_.erase(individual_it);
+    common_denominator_ = new_denominator;
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Get the individual probability (not cumulative) for an object.
+   *
+   * @param key The key of the object.
+   **/
+  const float getIndividualProbability(const std::size_t key) const {
+    const auto it = individual_probabilities_.find(key);
+    DCHECK(it != individual_probabilities_.end());
+    DCHECK_NE(0.0, common_denominator_);
+    return it->second / common_denominator_;
+  }
+
+  /**
+   * @brief Update the cumulative probabilities.
+   *
+   * @note This function should be called upon if there are any updates,
+   *       additions or deletions to the individual probabilities.
+   * @note An efficient implementation should be written if there are large
+   *       number of objects.
+   **/
+  void updateCumulativeProbabilities() {
+    cumulative_probabilities_.clear();
+    float cumulative_probability = 0;
+    for (const auto p : individual_probabilities_) {
+      cumulative_probability += p.second / common_denominator_;
+      cumulative_probabilities_.emplace_back(p.first,
+                                             cumulative_probability);
+    }
+    if (!cumulative_probabilities_.empty()) {
+      // Adjust the last cumulative probability manually to 1, so that
+      // floating addition related rounding issues are ignored.
+      cumulative_probabilities_.back().updateProbability(1);
+    }
+  }
+
+  /**
+   * @brief Return a uniformly chosen random key.
+   **/
+  inline const std::size_t pickRandomKey() const {
+    return getKeyForProbability(dist_(engine_));
+  }
+
+ private:
+  struct ProbabilityInfo {
+   public:
+    ProbabilityInfo(const std::size_t key, const float probability)
+        : key_(key), probability_(probability) {
+      DCHECK(probability_ <= 1.0);
+    }
+
+    void updateProbability(const float new_probability) {
+      probability_ = new_probability;
+    }
+
+    const std::size_t key_;
+    float probability_;
+  };
+
+  /**
+   * @brief Get a key for a given cumulative probability.
+   *
+   * @param key_cumulative_probability The input cumulative probability.
+   *
+   * @return The object that has a cumulative probability that is greater than
+   *         or equal to the input cumulative probability.
+   **/
+  inline std::size_t getKeyForProbability(
+      const float key_cumulative_probability) const {
+    DCHECK(!cumulative_probabilities_.empty());
+    // It doesn't matter in which order the objects are arranged in the
+    // cumulative_probabilities_ vector.
+    ProbabilityInfo search_key(0, key_cumulative_probability);
+    const auto it = std::upper_bound(
+        cumulative_probabilities_.begin(),
+        cumulative_probabilities_.end(),
+        search_key,
+        [](const ProbabilityInfo &a, const ProbabilityInfo &b) {
+          return a.probability_ < b.probability_;
+        });
+    DCHECK(it != std::end(cumulative_probabilities_));
+    return it->key_;
+  }
+
+  /**
+   * @brief Add individual (not cumulative) probability for a given object.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param key The key of the given object.
+   * @param numerator The numerator for the given object.
+   **/
+  void addOrUpdateObjectHelper(const std::size_t key,
+                               const float numerator) {
+    DCHECK_LE(numerator, common_denominator_);
+    individual_probabilities_[key] = numerator;
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Add individual (not cumulative) probabilities for given objects.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param keys A vector of keys to be added.
+   * @param numerators The numerators of the given objects.
+   **/
+  void addOrUpdateObjectsHelper(const std::vector<std::size_t> &keys,
+                                const std::vector<float> &numerators) {
+    DCHECK_EQ(keys.size(), numerators.size());
+    for (std::size_t i = 0; i < keys.size(); ++i) {
+      DCHECK_LE(numerators[i], common_denominator_);
+      individual_probabilities_[keys[i]] = numerators[i];
+    }
+    updateCumulativeProbabilities();
+  }
+
+  // Key = key of the object.
+  // Value = Numerator of the object.
+  std::unordered_map<std::size_t, float> individual_probabilities_;
+  std::vector<ProbabilityInfo> cumulative_probabilities_;
+
+  float common_denominator_;
+
+  mutable std::uniform_real_distribution<float> dist_;
+  mutable std::default_random_engine engine_;
+
+  DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f094a1c/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
new file mode 100644
index 0000000..e333f60
--- /dev/null
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_execution/ProbabilityStore.hpp"
+
+#include <cstddef>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+TEST(ProbabilityStoreTest, CountTest) {
+  ProbabilityStore store;
+  EXPECT_EQ(0u, store.getNumObjects());
+  const std::size_t kKey = 0;
+  store.addOrUpdateObjectNewDenominator(kKey, 1, 1);
+  EXPECT_EQ(1u, store.getNumObjects());
+  store.removeObject(kKey);
+  EXPECT_EQ(0u, store.getNumObjects());
+
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+  EXPECT_EQ(objects.size(), store.getNumObjects());
+}
+
+TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+}
+
+TEST(ProbabilityStoreTest, PickRandomKeyTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+  const std::size_t kNumTrials = 10;
+  while (!objects.empty()) {
+    for (std::size_t trial_num = 0; trial_num < kNumTrials; ++trial_num) {
+      const std::size_t picked_key = store.pickRandomKey();
+      const auto it = std::find(objects.begin(), objects.end(), picked_key);
+      EXPECT_TRUE(it != objects.end());
+    }
+    const std::size_t key_to_be_removed = objects.back();
+    store.removeObject(key_to_be_removed);
+    objects.pop_back();
+    EXPECT_EQ(objects.size(), store.getNumObjects());
+  }
+}
+
+TEST(ProbabilityStoreTest, RemoveObjectTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+
+  // Remove last object "9", with numerator 5.
+  store.removeObject(objects.back());
+  objects.pop_back();
+  numerators.pop_back();
+  const float expected_new_denominator =
+      std::accumulate(numerators.begin(), numerators.end(), 0);
+
+  EXPECT_EQ(expected_new_denominator, store.getDenominator());
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(expected_new_denominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+}
+
+}  // namespace quickstep