You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/05 15:43:43 UTC

[01/21] incubator-quickstep git commit: Disallow negative number of worker threads. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/scheduler++ d80ad4d6a -> 557ac9a33 (forced update)


Disallow negative number of worker threads.

- Fixed a bug thereby Quickstep command line now disallows negative
  number of worker threads.
- If the user provides zero or fewer worker threads, we switch to the
  default number of worker threasd, instead of terminating the process.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/31f1bbb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/31f1bbb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/31f1bbb1

Branch: refs/heads/scheduler++
Commit: 31f1bbb1c71d9a18af27ee540c83f513125b656f
Parents: 040a511
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 30 11:04:29 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sun Jul 3 23:20:00 2016 -0500

----------------------------------------------------------------------
 cli/QuickstepCli.cpp | 23 +++++++++++------------
 1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/31f1bbb1/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 3f99130..02a55a0 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -200,19 +200,18 @@ int main(int argc, char* argv[]) {
   // that we computed above, provided it did return a valid value.
   // TODO(jmp): May need to change this at some point to keep one thread
   //            available for the OS if the hardware concurrency level is high.
-  const unsigned int real_num_workers = quickstep::FLAGS_num_workers != 0
-                                      ? quickstep::FLAGS_num_workers
-                                      : (num_hw_threads != 0 ?
-                                         num_hw_threads
-                                         : 1);
-
-  if (real_num_workers > 0) {
-    printf("Starting Quickstep with %d worker thread(s) and a %.2f GB buffer pool\n",
-           real_num_workers,
-           (static_cast<double>(quickstep::FLAGS_buffer_pool_slots) * quickstep::kSlotSizeBytes)/quickstep::kAGigaByte);
-  } else {
-    LOG(FATAL) << "Quickstep needs at least one worker thread to run";
+  if (quickstep::FLAGS_num_workers <= 0) {
+    LOG(INFO) << "Quickstep expects at least one worker thread, switching to "
+                 "the default number of worker threads";
   }
+  const int real_num_workers = quickstep::FLAGS_num_workers > 0
+                                   ? quickstep::FLAGS_num_workers
+                                   : (num_hw_threads != 0 ? num_hw_threads : 1);
+
+  DCHECK_GT(real_num_workers, 0);
+  printf("Starting Quickstep with %d worker thread(s) and a %.2f GB buffer pool\n",
+         real_num_workers,
+         (static_cast<double>(quickstep::FLAGS_buffer_pool_slots) * quickstep::kSlotSizeBytes)/quickstep::kAGigaByte);
 
 #ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
   if (quickstep::FLAGS_use_hdfs) {


[06/21] incubator-quickstep git commit: Created a class for storing probabilities.

Posted by hb...@apache.org.
Created a class for storing probabilities.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/347d0397
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/347d0397
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/347d0397

Branch: refs/heads/scheduler++
Commit: 347d0397c90029a6d6bbb86d59710101ccf7b922
Parents: be92212
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 21 11:45:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  13 ++
 query_execution/PolicyEnforcer.cpp              |   1 +
 query_execution/ProbabilityStore.hpp            | 223 +++++++++++++++++++
 .../tests/ProbabilityStore_unittest.cpp         |  75 +++++++
 4 files changed, 312 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/347d0397/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index fcd4f48..18ae0da 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -36,6 +36,7 @@ add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionSt
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
             ${queryexecution_QueryContext_proto_srcs}
@@ -97,6 +98,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryManager
@@ -106,6 +108,9 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       quickstep_relationaloperators_WorkOrder
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_queryexecution_ProbabilityStore
+                      glog
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_QueryContext
                       glog
                       quickstep_catalog_CatalogDatabaseLite
@@ -252,6 +257,14 @@ if (ENABLE_DISTRIBUTED)
   add_test(BlockLocator_unittest BlockLocator_unittest)
 endif()
 
+add_executable(ProbabilityStore_unittest
+  "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
+target_link_libraries(ProbabilityStore_unittest
+                      gtest
+                      gtest_main
+                      quickstep_queryexecution_ProbabilityStore)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest) 
+
 add_executable(QueryManager_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
 target_link_libraries(QueryManager_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/347d0397/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 84aa86a..db7206b 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
 #include "query_execution/WorkerDirectory.hpp"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/347d0397/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
new file mode 100644
index 0000000..8343d24
--- /dev/null
+++ b/query_execution/ProbabilityStore.hpp
@@ -0,0 +1,223 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <random>
+#include <unordered_map>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief A class that stores the probabilities of objects. We use a field
+ *        called "property" to identify each object.
+ **/
+class ProbabilityStore {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  ProbabilityStore()
+      : mt_(std::random_device()()) {}
+
+  /**
+   * @brief Get the number of objects in the store.
+   **/
+  const std::size_t getNumObjects() const {
+    DCHECK_EQ(individual_probabilities_.size(), cumulative_probabilities_.size());
+    return individual_probabilities_.size();
+  }
+
+  /**
+   * @brief Add individual (not cumulative) probability for a given object.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param property The property of the given object.
+   * @param individual_probability The individual (not cumulative) probability
+   *        of the given object.
+   **/
+  void addProbability(const std::size_t property,
+                      const float individual_probability) {
+    individual_probabilities_[property] = individual_probability;
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Add individual (not cumulative) probabilities for given objects.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param properties A vector of properties to be added.
+   * @param individual_probabilities The individual (not cumulative)
+   *        probabilities of the given objects.
+   **/
+  void addProbabilities(const std::vector<std::size_t> &properties,
+                        const std::vector<float> &individual_probabilities) {
+    DCHECK_EQ(properties.size(), individual_probabilities.size());
+    for (std::size_t i = 0; i < properties.size(); ++i) {
+      individual_probabilities_[properties[i]] = individual_probabilities[i];
+    }
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Update  the probability of a given object to a new value.
+   *
+   * @param property The property of the object.
+   * @param new_individual_probability The new probability to be set.
+   **/
+  void updateProbability(const std::size_t property,
+                         const float new_individual_probability) {
+    auto it = individual_probabilities_.find(property);
+    DCHECK(it != individual_probabilities_.end());
+    it->second = new_individual_probability;
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Remove an object from the store.
+   *
+   * @param property The property of the object to be removed.
+   **/
+  void removeObject(const std::size_t property) {
+    auto it = individual_probabilities_.find(property);
+    DCHECK(it != individual_probabilities_.end());
+    individual_probabilities_.erase(it);
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Get the individual probability (not cumulative) for an object.
+   *
+   * @param property The property of the object.
+   **/
+  const float getIndividualProbability(const std::size_t property) const {
+    const auto it = individual_probabilities_.find(property);
+    DCHECK(it != individual_probabilities_.end());
+    return it->second;
+  }
+
+  /**
+   * @brief Update the cumulative probabilities.
+   *
+   * @note This function should be called upon if there are any updates,
+   *       additions or deletions to the individual probabilities.
+   * @note An efficient implementation should be written if there are large
+   *       number of objects.
+   **/
+  void updateCumulativeProbabilities() {
+    cumulative_probabilities_.clear();
+    if (individual_probabilities_.empty()) {
+      // No need to modify the cumulative probabilities.
+      return;
+    }
+    float cumulative_probability = 0;
+    for (const auto property_probability_pair : individual_probabilities_) {
+      cumulative_probabilities_.emplace_back(property_probability_pair.first,
+                                             cumulative_probability);
+      cumulative_probability += property_probability_pair.second;
+    }
+    // Adjust the last cumulative probability manually to 1.0, so that floating
+    // addition related rounding issues are ignored.
+    cumulative_probabilities_.back().updateProbability(1.0);
+  }
+
+  /**
+   * @brief Return a randomly chosen property.
+   *
+   * @note The random number is uniformly chosen.
+   **/
+  inline const std::size_t pickRandomProperty() {
+    std::uniform_real_distribution<float> dist(0.0, 1.0);
+    const float chosen_probability = dist(mt_);
+    return getPropertyForProbability(chosen_probability);
+  }
+
+ private:
+  class ProbabilityInfo {
+   public:
+    ProbabilityInfo(const std::size_t property, const float probability)
+        : property_(property), probability_(probability) {
+      DCHECK_LE(probability, 1.0);
+    }
+
+    ProbabilityInfo(const ProbabilityInfo &other) = default;
+
+    ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
+
+    void updateProbability(const float new_probability) {
+      DCHECK_LE(new_probability, 1.0);
+      probability_ = new_probability;
+    }
+
+    std::size_t property_;
+    float probability_;
+  };
+
+  /**
+   * @brief Get a property for a given cumulative probability.
+   *
+   * @param key_cumulative_probability The input cumulative probability.
+   *
+   * @return The object that has a cumulative probability that is greater than
+   *         or equal to the input cumulative probability.
+   **/
+  inline const std::size_t getPropertyForProbability(
+      const float key_cumulative_probability) {
+    DCHECK(!cumulative_probabilities_.empty());
+    // It doesn't matter in which order the objects are arranged in the
+    // cumulative_probabilities_ vector.
+    ProbabilityInfo search_key(0, key_cumulative_probability);
+    const auto it = std::upper_bound(
+        cumulative_probabilities_.begin(),
+        cumulative_probabilities_.end(),
+        search_key,
+        [](const ProbabilityInfo &a, const ProbabilityInfo &b) {
+          return a.probability_ < b.probability_;
+        });
+    DCHECK(it != std::end(cumulative_probabilities_));
+    return it->property_;
+  }
+
+  std::unordered_map<std::size_t, float> individual_probabilities_;
+  std::vector<ProbabilityInfo> cumulative_probabilities_;
+
+  std::mt19937_64 mt_;
+
+  DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/347d0397/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
new file mode 100644
index 0000000..e624557
--- /dev/null
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -0,0 +1,75 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <cstddef>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/ProbabilityStore.hpp"
+
+namespace quickstep {
+
+TEST(ProbabilityStoreTest, CountTest) {
+  ProbabilityStore store;
+  EXPECT_EQ(0u, store.getNumObjects());
+  const std::size_t kProperty = 0;
+  store.addProbability(kProperty, 0.5);
+  EXPECT_EQ(1u, store.getNumObjects());
+  store.removeObject(kProperty);
+  EXPECT_EQ(0u, store.getNumObjects());
+
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+  store.addProbabilities(objects, probabilities);
+
+  EXPECT_EQ(objects.size(), store.getNumObjects());
+}
+
+TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+  store.addProbabilities(objects, probabilities);
+
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(probabilities[object_num],
+              store.getIndividualProbability(objects[object_num]));
+  }
+}
+
+TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+  store.addProbabilities(objects, probabilities);
+
+  const std::size_t kNumTrials = 10;
+  while (!objects.empty()) {
+    for (std::size_t trial_num = 0; trial_num < kNumTrials; ++trial_num) {
+      const std::size_t picked_property = store.pickRandomProperty();
+      const auto it = std::find(objects.begin(), objects.end(), picked_property);
+      EXPECT_TRUE(it != objects.end());
+    }
+    const std::size_t property_to_be_removed = objects.back();
+    store.removeObject(property_to_be_removed);
+    objects.pop_back();
+    EXPECT_EQ(objects.size(), store.getNumObjects());
+  }
+}
+
+}  // namespace quickstep


[17/21] incubator-quickstep git commit: Bug fixed in Query Manager

Posted by hb...@apache.org.
Bug fixed in Query Manager

- Added a method to print probabilities.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9ec8910b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9ec8910b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9ec8910b

Branch: refs/heads/scheduler++
Commit: 9ec8910b41332b1131226cb98b987372a2ecd418
Parents: 9664de1
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 30 23:40:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:11 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp          | 18 ++++++++++++++++++
 query_execution/Learner.hpp          |  2 ++
 query_execution/ProbabilityStore.hpp | 12 ++++++++++++
 query_execution/QueryManager.cpp     |  7 ++++---
 4 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ec8910b/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 183d688..3c6f42b 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -61,6 +61,7 @@ void Learner::addCompletionFeedback(
   }
   updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
   updateProbabilitiesOfAllPriorityLevels();
+  printProbabilitiesForPriorityLevel(priority_level);
 }
 
 void Learner::updateProbabilitiesForQueriesInPriorityLevel(
@@ -268,4 +269,21 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
   }
 }
 
+void Learner::printProbabilitiesForPriorityLevel(const std::size_t priority_level) {
+  DCHECK(isPriorityLevelPresent(priority_level));
+  if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+    DCHECK(current_probabilities_.at(priority_level) != nullptr);
+    const auto it = current_probabilities_.find(priority_level);
+    if (it->second->getNumObjects() > 0) {
+      it->second->printIndividualProbabilities();
+    }
+  } else {
+    DCHECK(default_probabilities_.at(priority_level) != nullptr);
+    const auto it = default_probabilities_.find(priority_level);
+    if (it->second->getNumObjects() > 0) {
+      it->second->printIndividualProbabilities();
+    }
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ec8910b/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 024dc9b..6cea325 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -470,6 +470,8 @@ class Learner {
     return true;
   }
 
+  void printProbabilitiesForPriorityLevel(const std::size_t priority_level);
+
   // Key = Priority level, value = A vector of pairs.
   // Each pair:
   // 1st element: Query ID.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ec8910b/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 4ae085e..bcb2c0d 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <cstddef>
 #include <random>
+#include <string>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -234,6 +235,17 @@ class ProbabilityStore {
     return getPropertyForProbability(chosen_probability);
   }
 
+  void printIndividualProbabilities() const {
+    std::string result = "";
+    for (auto prob_pair : individual_probabilities_) {
+      result += std::to_string(prob_pair.first);
+      result += ":";
+      result += std::to_string(prob_pair.second.second);
+      result += "|";
+    }
+    LOG(INFO) << result;
+  }
+
  private:
   class ProbabilityInfo {
    public:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ec8910b/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
index d20b592..3be5ee4 100644
--- a/query_execution/QueryManager.cpp
+++ b/query_execution/QueryManager.cpp
@@ -238,16 +238,17 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
       LOG(FATAL) << "Unknown message type found in QueryManager";
   }
 
+  QueryStatusCode result = QueryStatusCode::kNone;
   if (query_exec_state_->hasExecutionFinished(op_index)) {
-    return QueryStatusCode::kOperatorExecuted;
+    result = QueryStatusCode::kOperatorExecuted;
   }
 
   // As kQueryExecuted takes precedence over kOperatorExecuted, we check again.
   if (query_exec_state_->hasQueryExecutionFinished()) {
-    return QueryStatusCode::kQueryExecuted;
+    result = QueryStatusCode::kQueryExecuted;
   }
 
-  return QueryStatusCode::kNone;
+  return result;
 }
 
 void QueryManager::processFeedbackMessage(


[10/21] incubator-quickstep git commit: Fixed getDenominator method. More unit tests.

Posted by hb...@apache.org.
Fixed getDenominator method. More unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c17529a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c17529a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c17529a9

Branch: refs/heads/scheduler++
Commit: c17529a9ce263a0b143d5c79922e95ce88cd8f99
Parents: 996ca75
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sat Jun 25 09:35:10 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/ExecutionStats.hpp         |  15 +-
 query_execution/Learner.cpp                |  46 +++--
 query_execution/Learner.hpp                | 147 ++++++++++++--
 query_execution/ProbabilityStore.hpp       |  15 +-
 query_execution/QueryExecutionTypedefs.hpp |   1 +
 query_execution/tests/Learner_unittest.cpp | 259 +++++++++++++++++++++++-
 6 files changed, 440 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c17529a9/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index 769c7a4..5643749 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,17 +58,17 @@ class ExecutionStats {
   }
 
   /**
-   * @brief Check if there are any stats present.
+   * @brief Check if there are stats present for at least one active operator.
    **/
   inline bool hasStats() const {
     for (auto it = active_operators_.begin();
          it != active_operators_.end();
          ++it) {
-      if (!it->second->hasStatsForOperator()) {
-        return false;
+      if (it->second->hasStatsForOperator()) {
+        return true;
       }
     }
-    return true;
+    return false;
   }
 
   /**
@@ -109,14 +109,13 @@ class ExecutionStats {
    * @param operator_index The operator index which the value belongs to.
    **/
   void addEntry(std::size_t value, std::size_t operator_index) {
-    if (hasOperator(operator_index)) {
-      // This is not the first entry for the given operator.
-      active_operators_[operator_index]->addEntry(value);
-    } else {
+    if (!hasOperator(operator_index)) {
+      // This is the first entry for the given operator.
       // Create the OperatorStats object for this operator.
       active_operators_[operator_index] =
           std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
     }
+    active_operators_[operator_index]->addEntry(value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c17529a9/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 720df33..11c3735 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,7 +26,6 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
@@ -76,10 +75,20 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     DCHECK(current_probabilities_[priority_level] != nullptr);
     // As we want the probability of the lone query in this priority level as
     // 1, we set the numerator same as denominator.
-    const std::size_t numerator =
-        current_probabilities_[priority_level]->getDenominator();
-    current_probabilities_[priority_level]->addOrUpdateObject(query_id,
-                                                              numerator);
+    // TODO(harshad) - Get the mean work order times here too and use that as
+    // the numerator.
+    ExecutionStats *stats = getExecutionStats(query_id);
+    auto query_stats = stats->getCurrentStats();
+    /*const std::size_t numerator =
+        current_probabilities_[priority_level]->getDenominator();*/
+    if (query_stats.second != 0) {
+      const float mean_workorder_time =
+          query_stats.first / static_cast<float>(query_stats.second);
+      if (mean_workorder_time != 0) {
+        current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+            query_id, 1 / mean_workorder_time, 1 / mean_workorder_time);
+      }
+    }
     return;
   }
   // Else, there are more than one queries for the given priority level.
@@ -92,18 +101,25 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     // queries.
     DCHECK(mean_workorders_per_query.find(query_id) !=
            mean_workorders_per_query.end());
+    DCHECK_NE(mean_workorders_per_query[query_id], 0);
     current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
-        query_id, mean_workorders_per_query[query_id], denominator);
+        query_id,
+        1 / static_cast<float>(mean_workorders_per_query[query_id]),
+        denominator);
+    // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
   } else {
     // At least one of the queries has predicted time for next work order as 0.
     // In such a case, we don't update the probabilities and continue to use
     // the older probabilities.
+    // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
+    return;
   }
 }
 
 void Learner::updateProbabilitiesOfAllPriorityLevels() {
-  if (!hasFeedbackFromAllPriorityLevels() ||
-      has_feedback_from_all_queries_.empty()) {
+  if (!hasFeedbackFromAllPriorityLevels()) {
+      // has_feedback_from_all_queries_.empty()) {
+      // NOTE(harshad) : Not using this cache as it gets confusing.
     // Either we don't have enough feedback messages from all the priority
     // levels OR there are no active queries in the system.
     return;
@@ -111,9 +127,11 @@ void Learner::updateProbabilitiesOfAllPriorityLevels() {
   // Compute the predicted work order execution times for all the level.
   std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
   std::size_t sum_active_priorities = 0;
-  for (auto priority_iter : has_feedback_from_all_queries_) {
+  for (auto priority_iter = execution_stats_.begin();
+       priority_iter != execution_stats_.end();
+       ++priority_iter) {
     std::size_t total_time_curr_level = 0;
-    const std::size_t curr_priority_level = priority_iter.first;
+    const std::size_t curr_priority_level = priority_iter->first;
     sum_active_priorities += curr_priority_level;
     // For each query, find its predicted work order execution time.
     const std::unordered_map<std::size_t, std::size_t>
@@ -194,6 +212,7 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
   for (auto priority_iter = execution_stats_.cbegin();
        priority_iter != execution_stats_.cend();
        ++priority_iter) {
+    DCHECK(!priority_iter->second.empty());
     const std::size_t curr_priority_level = priority_iter->first;
     sum_priority_levels += curr_priority_level;
     priority_levels.emplace_back(curr_priority_level);
@@ -217,7 +236,8 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
           new ExecutionStats(FLAGS_max_past_entries_learner)));
   // As we are initializing the query, we obviously haven't gotten any
   // feedback message for this query. Hence mark the following field as false.
-  has_feedback_from_all_queries_[priority_level] = false;
+  // has_feedback_from_all_queries_[priority_level] = false;
+  // NOTE(harshad) : Not using this cache as it gets confusing.
 }
 
 void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
@@ -226,7 +246,9 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
     execution_stats_.erase(priority_level);
     current_probabilities_.erase(priority_level);
     probabilities_of_priority_levels_->removeObject(priority_level);
-    has_feedback_from_all_queries_.erase(priority_level);
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // has_feedback_from_all_queries_.erase(priority_level);
+    // LOG(INFO) << "Removed priority level: " << priority_level;
     if (hasActiveQueries()) {
       if (static_cast<int>(priority_level) == highest_priority_level_) {
         // The priority level to be removed is the highest priority level.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c17529a9/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index f99b1c6..2c6fdef 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -27,11 +27,14 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
 
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
 namespace quickstep {
 
 /** \addtogroup QueryExecution
@@ -49,16 +52,26 @@ class Learner {
       const serialization::NormalWorkOrderCompletionMessage
           &workorder_completion_proto);
 
+  /**
+   * @brief Add a query to the Learner.
+   *
+   * @param query_handle The query handle for the new query.
+   **/
   void addQuery(const QueryHandle &query_handle) {
     initializePriorityLevelIfNotPresent(query_handle.query_priority());
     initializeQuery(query_handle);
     relearn();
   }
 
+  /**
+   * @brief Remove a query from the Learner.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
   void removeQuery(const std::size_t query_id) {
-    // Find the iterator to the query in execution_stats_.
     DCHECK(isQueryPresent(query_id));
     const std::size_t priority_level = getQueryPriority(query_id);
+    // Find the iterator to the query in execution_stats_.
     auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
     execution_stats_[priority_level].erase(stats_iter_mutable);
     DCHECK(current_probabilities_.find(priority_level) !=
@@ -69,17 +82,25 @@ class Learner {
       // current_probabilities_[priority_level] ProbabilityStore.
       current_probabilities_[priority_level]->removeObject(query_id);
     }
+    // has_feedback_from_all_queries_[priority_level] = false;
     query_id_to_priority_lookup_.erase(query_id);
     checkAndRemovePriorityLevel(priority_level);
     relearn();
   }
 
-  void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+  /**
+   * @brief Remove the stats of a given operator in a given query.
+   **/
+  void removeOperator(const std::size_t query_id,
+                      const std::size_t operator_id) {
     ExecutionStats *stats = getExecutionStats(query_id);
     DCHECK(stats != nullptr);
     stats->removeOperator(operator_id);
   }
 
+  /**
+   * @brief Reset the probabilities and start learning again.
+   **/
   void relearn() {
     if (hasActiveQueries()) {
       initializeDefaultProbabilitiesForAllQueries();
@@ -87,10 +108,17 @@ class Learner {
     }
   }
 
+  /**
+   * @brief Check if there are any active queries in the Learner.
+   **/
   inline const bool hasActiveQueries() const {
     return !query_id_to_priority_lookup_.empty();
   }
 
+  /**
+   * @brief Get the number of active queries in the Learner for the given
+   *        priority level.
+   **/
   inline const std::size_t getNumActiveQueriesInPriorityLevel(
       const std::size_t priority_level) const {
     const auto it = execution_stats_.find(priority_level);
@@ -101,6 +129,9 @@ class Learner {
     }
   }
 
+  /**
+   * @brief Get the total number of active queries in the Learner.
+   **/
   inline const std::size_t getTotalNumActiveQueries() const {
     return query_id_to_priority_lookup_.size();
   }
@@ -115,6 +146,83 @@ class Learner {
     return highest_priority_level_;
   }
 
+  /**
+   * @brief Randomly pick a priority level.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A priority level. If no queries are present in the learner, return
+   *         kInvalidPriorityLevel.
+   **/
+  inline const int pickRandomPriorityLevel() const {
+    if (hasActiveQueries()) {
+      const int result = static_cast<int>(
+          probabilities_of_priority_levels_->pickRandomProperty());
+      /*LOG(INFO) << "Random priority level: " << result << " has "
+                << current_probabilities_.find(result)->second->getNumObjects()
+                << " queries";*/
+      return result;
+    } else {
+      return kInvalidPriorityLevel;
+    }
+  }
+
+  /**
+   * @brief Randomly pick a query from any priority level in the learner.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A query ID. If no queries are present in the learner, return
+   *         kInvalidQueryID.
+   **/
+  inline const int pickRandomQuery() const {
+    if (hasActiveQueries()) {
+      const int random_priority_level = pickRandomPriorityLevel();
+      // Note : The valid priority level values are non-zero.
+      DCHECK_GT(random_priority_level, 0);
+      const int result = pickRandomQueryFromPriorityLevel(
+          static_cast<std::size_t>(random_priority_level));
+      // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
+      return result;
+    } else {
+      // LOG(INFO) << "No active query right now";
+      return kInvalidQueryID;
+    }
+  }
+
+  /**
+   * @brief Randomly pick a query from a given priority level in the learner.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A query ID. If no queries are present for this priority level in
+   *         the learner, return kInvalidQueryID.
+   **/
+  inline const int pickRandomQueryFromPriorityLevel(
+      const std::size_t priority_level) const {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    if (hasActiveQueries()) {
+      if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+        DCHECK(current_probabilities_.at(priority_level) != nullptr);
+        const auto it = current_probabilities_.find(priority_level);
+        if (it->second->getNumObjects() > 0) {
+          return static_cast<int>(
+              current_probabilities_.at(priority_level)->pickRandomProperty());
+        }
+        // LOG(INFO) << "No queries in priority level: " << priority_level;
+      } else {
+        DCHECK(default_probabilities_.at(priority_level) != nullptr);
+        const auto it = default_probabilities_.find(priority_level);
+        if (it->second->getNumObjects() > 0) {
+          return static_cast<int>(
+              default_probabilities_.at(priority_level)->pickRandomProperty());
+        }
+        // LOG(INFO) << "No queries in priority level: " << priority_level;
+      }
+    }
+    return kInvalidQueryID;
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -261,6 +369,8 @@ class Learner {
    **/
   inline bool hasFeedbackFromAllQueriesInPriorityLevel(
       const std::size_t priority_level) const {
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // return has_feedback_from_all_queries_.at(priority_level);
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
         &stats_vector = execution_stats_.at(priority_level);
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {
@@ -275,16 +385,19 @@ class Learner {
   inline void updateFeedbackFromQueriesInPriorityLevel(
       const std::size_t priority_level) {
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
-        &stats_vector = execution_stats_.at(priority_level);
+        &stats_vector = execution_stats_[priority_level];
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {
       DCHECK(stats_vector[i].second != nullptr);
       if (!stats_vector[i].second->hasStats()) {
         // At least one query has no statistics so far.
+        // NOTE(harshad) : Not using this cache as it gets confusing.
+        // has_feedback_from_all_queries_[priority_level] = false;
         return;
       }
     }
     // All the queries have at least one execution statistic.
-    has_feedback_from_all_queries_[priority_level] = true;
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // has_feedback_from_all_queries_[priority_level] = true;
   }
 
   /**
@@ -313,31 +426,36 @@ class Learner {
   }
 
   /**
-   * @param mean_workorder_per_query A vector of pairs in which:
-   *        1st element is mean time per work order
-   *        2nd element is the query ID.
+   * @param mean_workorder_per_query An unordered_map in which:
+   *        1st element is the query ID.
+   *        2nd element is mean time per work order
    *
    * @note If any query has mean work order time as 0, we return 0 as the
    *       denominator.
    *
    * @return The denominator to be used for probability calculations.
    **/
-  inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
-                                        &mean_workorder_per_query) const {
+  inline float calculateDenominator(
+      const std::unordered_map<std::size_t, std::size_t>
+          &mean_workorder_per_query) const {
     float denominator = 0;
     for (const auto &element : mean_workorder_per_query) {
       if (element.second != 0) {
         denominator += 1/static_cast<float>(element.second);
-      } else {
-        return 0;
+      /*} else {
+        return 0;*/
       }
     }
     return denominator;
   }
 
   inline bool hasFeedbackFromAllPriorityLevels() const {
-    for (auto feedback : has_feedback_from_all_queries_) {
-      if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+    // for (auto feedback : has_feedback_from_all_queries_) {
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    for (auto priority_iter = default_probabilities_.cbegin();
+         priority_iter != default_probabilities_.cend();
+         ++priority_iter) {
+      if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_iter->first)) {
         return false;
       }
     }
@@ -369,9 +487,10 @@ class Learner {
   // ProbabilityStrore for probabilities mapped to the priority levels.
   std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
 
+  // NOTE(harshad) : Not using this cache as it gets confusing.
   // Key = priority level. Value = A boolean that indicates if we have received
   // feedback from all the queries in the given priority level.
-  std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+  // std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
   int highest_priority_level_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c17529a9/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 233dd2e..ed52f75 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -41,7 +41,7 @@ class ProbabilityStore {
    * @brief Constructor.
    **/
   ProbabilityStore()
-      : common_denominator_(1.0), mt_(std::random_device()()) {}
+      : common_denominator_(1.0) {}
 
   /**
    * @brief Get the number of objects in the store.
@@ -221,11 +221,16 @@ class ProbabilityStore {
   /**
    * @brief Return a randomly chosen property.
    *
+   * TODO(harshad) - If it is expensive to create the random device
+   * on every invocation of this function, make it a class variable.
+   * In which case, we won't be able to mark the function as const.
+   *
    * @note The random number is uniformly chosen.
    **/
-  inline const std::size_t pickRandomProperty() {
+  inline const std::size_t pickRandomProperty() const {
     std::uniform_real_distribution<float> dist(0.0, 1.0);
-    const float chosen_probability = dist(mt_);
+    std::random_device rd;
+    const float chosen_probability = dist(rd);
     return getPropertyForProbability(chosen_probability);
   }
 
@@ -260,7 +265,7 @@ class ProbabilityStore {
    *         or equal to the input cumulative probability.
    **/
   inline const std::size_t getPropertyForProbability(
-      const float key_cumulative_probability) {
+      const float key_cumulative_probability) const {
     DCHECK(!cumulative_probabilities_.empty());
     // It doesn't matter in which order the objects are arranged in the
     // cumulative_probabilities_ vector.
@@ -296,8 +301,6 @@ class ProbabilityStore {
 
   float common_denominator_;
 
-  std::mt19937_64 mt_;
-
   DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c17529a9/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index e13f3e0..feaa285 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -44,6 +44,7 @@ typedef tmb::client_id client_id;
 typedef tmb::message_type_id message_type_id;
 
 const int kInvalidPriorityLevel = -1;
+const int kInvalidQueryID = -1;
 
 using ClientIDMap = ThreadIDBasedMap<client_id,
                                      'C',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c17529a9/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 107576f..7d67b1b 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -220,15 +220,16 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
 
   // Randomize the orders.
   std::random_device rd;
-  std::mt19937 g(rd());
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
 
   std::shuffle(priorities_insertion_order.begin(),
                priorities_insertion_order.end(),
-               g);
+               g1);
 
   std::shuffle(priorities_removal_order.begin(),
                priorities_removal_order.end(),
-               g);
+               g2);
 
   Learner learner;
   EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
@@ -263,4 +264,256 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
   EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
 }
 
+TEST_F(LearnerTest, PickRandomPriorityLevelTest) {
+  std::vector<std::size_t> priorities_insertion_order;
+  std::vector<std::size_t> priorities_removal_order;
+  const std::size_t kNumPrioritiesToTest = 20;
+  for (std::size_t priority_num = 1;
+       priority_num <= kNumPrioritiesToTest;
+       ++priority_num) {
+    // Note: Priority level should be non-zero, hence we begin from 1.
+    priorities_insertion_order.emplace_back(priority_num);
+    priorities_removal_order.emplace_back(priority_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(priorities_insertion_order.begin(),
+               priorities_insertion_order.end(),
+               g1);
+
+  std::shuffle(priorities_removal_order.begin(),
+               priorities_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+
+  std::unique_ptr<QueryHandle> handle;
+  // First insert the queries in the order of priorities as defined by
+  // priorities_insertion_order.
+  for (auto it = priorities_insertion_order.begin();
+       it != priorities_insertion_order.end();
+       ++it) {
+    // Note that the query ID is kept the same as priority level for simplicity.
+    handle.reset(new QueryHandle(*it, *it));
+    learner.addQuery(*handle);
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_insertion_order vector.
+    auto find_priority_level_it = std::find(
+        priorities_insertion_order.begin(), it + 1, picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+  }
+
+  // Repeat the tests a few more times.
+  const std::size_t kNumTests = 200;
+  for (std::size_t test_num = 0; test_num < kNumTests; ++test_num) {
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_insertion_order vector.
+    auto find_priority_level_it = std::find(priorities_insertion_order.begin(),
+                                            priorities_insertion_order.end(),
+                                            picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+  }
+
+  // Now remove the queries in the order of priorities as defined by
+  // priorities_removal_order.
+  for (auto it = priorities_removal_order.begin();
+       it != priorities_removal_order.end();
+       ++it) {
+    // Recall that the query ID is the same as priority level.
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_removal_order vector.
+    auto find_priority_level_it = std::find(
+        it, priorities_removal_order.end(), picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_removal_order.end());
+    learner.removeQuery(*it);
+  }
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+}
+
+TEST_F(LearnerTest, PickRandomQueryDefaultProbabilitiesTest) {
+  // We use a set of unique query IDs. For each query ID, we assign a priority
+  // level. The set of priority levels is smaller than the set of query IDs, so
+  // that we can have more than one queries for a given priority level.
+
+  // Also, in this test we don't send any completion feedback message to the
+  // learner. Therefore it always refers to the default probabilities set for
+  // the queries.
+  std::vector<std::size_t> query_ids_insertion_order;
+  std::vector<std::size_t> query_ids_removal_order;
+  const std::size_t kNumQueriesToTest = 20;
+  for (std::size_t query_num = 0;
+       query_num < kNumQueriesToTest;
+       ++query_num) {
+    query_ids_insertion_order.emplace_back(query_num);
+    query_ids_removal_order.emplace_back(query_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(query_ids_insertion_order.begin(),
+               query_ids_insertion_order.end(),
+               g1);
+
+  std::shuffle(query_ids_removal_order.begin(),
+               query_ids_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+  std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+  std::size_t priority_level_index = 0;
+  std::unique_ptr<QueryHandle> handle;
+  // Insert the queries in the order as defined in query_ids_insertion_order.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+    priority_level_index = (priority_level_index + 1) % priority_levels.size();
+    learner.addQuery(*handle);
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(
+        query_ids_insertion_order.begin(), it + 1, picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Repeat the tests a few more times.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(query_ids_insertion_order.begin(),
+                                   query_ids_insertion_order.end(),
+                                   picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Remove the queries in the order as defined in query_ids_removal_order.
+  for (auto it = query_ids_removal_order.begin();
+       it != query_ids_removal_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_removal_order.
+    auto find_query_it = std::find(
+        it, query_ids_removal_order.end(), picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+    learner.removeQuery(*it);
+  }
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
+
+TEST_F(LearnerTest, PickRandomQueryCurrentProbabilitiesTest) {
+  // We use a set of unique query IDs. For each query ID, we assign a priority
+  // level. The set of priority levels is smaller than the set of query IDs, so
+  // that we can have more than one queries for a given priority level.
+
+  // In this test we send completion feedback messages for all the queries
+  // to the learner. Therefore it refers to the current probabilities set for
+  // the queries.
+  std::vector<std::size_t> query_ids_insertion_order;
+  std::vector<std::size_t> query_ids_removal_order;
+  const std::size_t kNumQueriesToTest = 20;
+  for (std::size_t query_num = 0;
+       query_num < kNumQueriesToTest;
+       ++query_num) {
+    query_ids_insertion_order.emplace_back(query_num);
+    query_ids_removal_order.emplace_back(query_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(query_ids_insertion_order.begin(),
+               query_ids_insertion_order.end(),
+               g1);
+
+  std::shuffle(query_ids_removal_order.begin(),
+               query_ids_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+  std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+  std::size_t priority_level_index = 0;
+  std::unique_ptr<QueryHandle> handle;
+  // Insert the queries in the order as defined in query_ids_insertion_order.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+    priority_level_index = (priority_level_index + 1) % priority_levels.size();
+    learner.addQuery(*handle);
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(
+        query_ids_insertion_order.begin(), it + 1, picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Now send one completion feedback message per query to the learner.
+  const std::size_t kOperatorID = 0;
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    // LOG(INFO) << "Completion message for query : " << *it;
+    learner.addCompletionFeedback(createMockCompletionMessage(*it, kOperatorID));
+  }
+
+  // Repeat the tests a few more times.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(query_ids_insertion_order.begin(),
+                                   query_ids_insertion_order.end(),
+                                   picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Remove the queries in the order as defined in query_ids_removal_order.
+  for (auto it = query_ids_removal_order.begin();
+       it != query_ids_removal_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_removal_order.
+    auto find_query_it = std::find(
+        it, query_ids_removal_order.end(), picked_query_id);
+    // We expect the search to be successful.
+    // LOG(INFO) << "Picked query ID: " << picked_query_id << "\n";
+    EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+    learner.removeQuery(*it);
+    // LOG(INFO) << "Removed query ID: " << *it;
+  }
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
 }  // namespace quickstep


[09/21] incubator-quickstep git commit: Created PriorityPolicyEnforcer class.

Posted by hb...@apache.org.
Created PriorityPolicyEnforcer class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/672a1934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/672a1934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/672a1934

Branch: refs/heads/scheduler++
Commit: 672a1934bb050e1a736f08dc8c87eba5e31162a8
Parents: c64bf75
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 09:49:29 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             |  19 +-
 query_execution/Foreman.cpp                |   2 +-
 query_execution/Foreman.hpp                |   4 +-
 query_execution/PolicyEnforcer.cpp         |   2 -
 query_execution/PriorityPolicyEnforcer.cpp | 222 ++++++++++++++++++++++++
 query_execution/PriorityPolicyEnforcer.hpp | 222 ++++++++++++++++++++++++
 6 files changed, 465 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/672a1934/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 4639617..104f9da 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_PriorityPolicyEnforcer PriorityPolicyEnforcer.cpp PriorityPolicyEnforcer.hpp)
 add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
@@ -80,7 +81,7 @@ target_link_libraries(quickstep_queryexecution_Foreman
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanLite
-                      quickstep_queryexecution_PolicyEnforcer
+                      quickstep_queryexecution_PriorityPolicyEnforcer
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_WorkerDirectory
@@ -108,6 +109,21 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_ProbabilityStore
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryManager
+                      quickstep_queryexecution_WorkerDirectory
+                      quickstep_queryexecution_WorkerMessage
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_queryexecution_PriorityPolicyEnforcer
+                      ${GFLAGS_LIB_NAME}
+                      glog
+                      quickstep_queryexecution_ExecutionStats
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_Learner
                       quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryExecutionMessages_proto
@@ -225,6 +241,7 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_ForemanLite
                       quickstep_queryexecution_Learner
                       quickstep_queryexecution_PolicyEnforcer
+                      quickstep_queryexecution_PriorityPolicyEnforcer
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryexecution_QueryExecutionMessages_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/672a1934/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index f9f2e7a..0898ac1 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -87,7 +87,7 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
 
-  policy_enforcer_.reset(new PolicyEnforcer(
+  policy_enforcer_.reset(new PriorityPolicyEnforcer(
       foreman_client_id_,
       num_numa_nodes,
       catalog_database_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/672a1934/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 7be57e7..c38a3e6 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -24,7 +24,7 @@
 #include <vector>
 
 #include "query_execution/ForemanLite.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
+#include "query_execution/PriorityPolicyEnforcer.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
@@ -128,7 +128,7 @@ class Foreman final : public ForemanLite {
   CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
 
-  std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+  std::unique_ptr<PriorityPolicyEnforcer> policy_enforcer_;
 
   DISALLOW_COPY_AND_ASSIGN(Foreman);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/672a1934/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index ff734ca..db7206b 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,7 +25,6 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/Learner.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
@@ -43,7 +42,6 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " the workers.");
 
 bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  Learner learner;
   if (admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/672a1934/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
new file mode 100644
index 0000000..44ccb0a
--- /dev/null
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -0,0 +1,222 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PriorityPolicyEnforcer.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/WorkOrder.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+              " can be allocated in a single round of dispatch of messages to"
+              " the workers.");
+
+bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+  Learner learner;
+  if (admitted_queries_.size() < kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+                           catalog_database_, storage_manager_, bus_));
+      LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+      learner_->addQuery(*query_handle);
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
+  // TODO(harshad) : Provide processXMessage() public functions in
+  // QueryManager, so that we need to extract message from the
+  // TaggedMessage only once.
+  std::size_t query_id;
+  switch (tagged_message.message_type()) {
+    case kWorkOrderCompleteMessage: {
+      serialization::NormalWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // WorkOrder. It can be accessed in this scope.
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      worker_directory_->decrementNumQueuedWorkOrders(
+          proto.worker_thread_index());
+      learner_->addCompletionFeedback(proto);
+      if (profile_individual_workorders_) {
+        recordTimeForWorkOrder(proto);
+      }
+      break;
+    }
+    case kRebuildWorkOrderCompleteMessage: {
+      serialization::RebuildWorkOrderCompletionMessage proto;
+      // Note: This proto message contains the time it took to execute the
+      // rebuild WorkOrder. It can be accessed in this scope.
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      worker_directory_->decrementNumQueuedWorkOrders(
+          proto.worker_thread_index());
+      break;
+    }
+    case kCatalogRelationNewBlockMessage: {
+      serialization::CatalogRelationNewBlockMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kDataPipelineMessage: {
+      serialization::DataPipelineMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kWorkOrdersAvailableMessage: {
+      serialization::WorkOrdersAvailableMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
+    case kWorkOrderFeedbackMessage: {
+      WorkOrder::FeedbackMessage msg(
+          const_cast<void *>(tagged_message.message()),
+          tagged_message.message_bytes());
+      query_id = msg.header().query_id;
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown message type found in PriorityPolicyEnforcer";
+  }
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  const QueryManager::QueryStatusCode return_code =
+      admitted_queries_[query_id]->processMessage(tagged_message);
+  if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+    removeQuery(query_id);
+    if (!waiting_queries_.empty()) {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
+  }
+}
+
+void PriorityPolicyEnforcer::getWorkerMessages(
+    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(worker_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  std::size_t per_query_share = 0;
+  if (!admitted_queries_.empty()) {
+    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  } else {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+  DCHECK_GT(per_query_share, 0u);
+  std::vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManager *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      WorkerMessage *next_worker_message =
+          curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+      if (next_worker_message != nullptr) {
+        ++messages_collected_curr_query;
+        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    removeQuery(finished_qid);
+  }
+}
+
+void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+    LOG(WARNING) << "Removing query with ID " << query_id
+                 << " that hasn't finished its execution";
+  }
+  admitted_queries_.erase(query_id);
+  learner_->removeQuery(query_id);
+}
+
+bool PriorityPolicyEnforcer::admitQueries(
+    const std::vector<QueryHandle*> &query_handles) {
+  for (QueryHandle *curr_query : query_handles) {
+    if (!admitQuery(curr_query)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void PriorityPolicyEnforcer::recordTimeForWorkOrder(
+    const serialization::NormalWorkOrderCompletionMessage &proto) {
+  const std::size_t query_id = proto.query_id();
+  if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+    workorder_time_recorder_[query_id];
+  }
+  workorder_time_recorder_[query_id].emplace_back(
+      proto.worker_thread_index(),
+      proto.operator_index(),
+      proto.execution_time_in_microseconds());
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/672a1934/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
new file mode 100644
index 0000000..94cbe38
--- /dev/null
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -0,0 +1,222 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/Learner.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerDirectory;
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PriorityPolicyEnforcer {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param num_numa_nodes Number of NUMA nodes used by the system.
+   * @param catalog_database The CatalogDatabase used.
+   * @param storage_manager The StorageManager used.
+   * @param bus The TMB.
+   **/
+  PriorityPolicyEnforcer(const tmb::client_id foreman_client_id,
+                         const std::size_t num_numa_nodes,
+                         CatalogDatabaseLite *catalog_database,
+                         StorageManager *storage_manager,
+                         WorkerDirectory *worker_directory,
+                         tmb::MessageBus *bus,
+                         const bool profile_individual_workorders = false)
+      : foreman_client_id_(foreman_client_id),
+        num_numa_nodes_(num_numa_nodes),
+        catalog_database_(catalog_database),
+        storage_manager_(storage_manager),
+        worker_directory_(worker_directory),
+        bus_(bus),
+        profile_individual_workorders_(profile_individual_workorders) {
+    learner_.reset(new Learner());
+  }
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PriorityPolicyEnforcer() {
+    if (hasQueries()) {
+      LOG(WARNING) << "Destructing PriorityPolicyEnforcer with some unfinished or "
+                      "waiting queries";
+    }
+  }
+
+  /**
+   * @brief Admit a query to the system.
+   *
+   * @param query_handle The QueryHandle for the new query.
+   *
+   * @return Whether the query was admitted to the system.
+   **/
+  bool admitQuery(QueryHandle *query_handle);
+
+  /**
+   * @brief Admit multiple queries in the system.
+   *
+   * @note In the current simple implementation, we only allow one active
+   *       query in the system. Other queries will have to wait.
+   *
+   * @param query_handles A vector of QueryHandles for the queries to be
+   *        admitted.
+   *
+   * @return True if all the queries were admitted, false if at least one query
+   *         was not admitted.
+   **/
+  bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+  /**
+   * @brief Remove a given query that is under execution.
+   *
+   * @note This function is made public so that it is possible for a query
+   *       to be killed. Otherwise, it should only be used privately by the
+   *       class.
+   *
+   * TODO(harshad) - Extend this function to support removal of waiting queries.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
+  void removeQuery(const std::size_t query_id);
+
+  /**
+   * @brief Get worker messages to be dispatched. These worker messages come
+   *        from the active queries.
+   *
+   * @param worker_messages The worker messages to be dispatched.
+   **/
+  void getWorkerMessages(
+      std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+  /**
+   * @brief Process a message sent to the Foreman, which gets passed on to the
+   *        policy enforcer.
+   *
+   * @param message The message.
+   **/
+  void processMessage(const TaggedMessage &tagged_message);
+
+  /**
+   * @brief Check if there are any queries to be executed.
+   *
+   * @return True if there is at least one active or waiting query, false if
+   *         the policy enforcer doesn't have any query.
+   **/
+  inline bool hasQueries() const {
+    return !(admitted_queries_.empty() && waiting_queries_.empty());
+  }
+
+  /**
+   * @brief Get the profiling results for individual work order execution for a
+   *        given query.
+   *
+   * @note This function should only be called if profiling individual work
+   *       orders option is enabled.
+   *
+   * @param query_id The ID of the query for which the profiling results are
+   *        requested.
+   *
+   * @return A vector of tuples, each being a single profiling entry.
+   **/
+  inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+      getProfilingResults(const std::size_t query_id) const {
+    DCHECK(profile_individual_workorders_);
+    DCHECK(workorder_time_recorder_.find(query_id) !=
+           workorder_time_recorder_.end());
+    return workorder_time_recorder_.at(query_id);
+  }
+
+ private:
+  static constexpr std::size_t kMaxConcurrentQueries = 2;
+
+  /**
+   * @brief Record the execution time for a finished WorkOrder.
+   *
+   * TODO(harshad) - Extend the functionality to rebuild work orders.
+   *
+   * @param proto The completion message proto sent after the WorkOrder
+   *        execution.
+   **/
+  void recordTimeForWorkOrder(
+      const serialization::NormalWorkOrderCompletionMessage &proto);
+
+  const tmb::client_id foreman_client_id_;
+  const std::size_t num_numa_nodes_;
+
+  CatalogDatabaseLite *catalog_database_;
+  StorageManager *storage_manager_;
+  WorkerDirectory *worker_directory_;
+
+  tmb::MessageBus *bus_;
+  const bool profile_individual_workorders_;
+
+  // Key = query ID, value = QueryManager* for the key query.
+  std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+
+  // The queries which haven't been admitted yet.
+  std::queue<QueryHandle*> waiting_queries_;
+
+  // Key = Query ID.
+  // Value = A tuple indicating a record of executing a work order.
+  // Within a tuple ...
+  // 1st element: Logical worker ID.
+  // 2nd element: Operator ID.
+  // 3rd element: Time in microseconds to execute the work order.
+  std::unordered_map<
+      std::size_t,
+      std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+      workorder_time_recorder_;
+
+  std::unique_ptr<Learner> learner_;
+
+  DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_


[04/21] incubator-quickstep git commit: Bug fix in initialization of probabilities.

Posted by hb...@apache.org.
Bug fix in initialization of probabilities.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e7e497ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e7e497ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e7e497ea

Branch: refs/heads/scheduler++
Commit: e7e497eae99622dc2a7fa0fbed7e1c2cb085a25d
Parents: 083d17d
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:57:01 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp                |  7 +++--
 query_execution/tests/Learner_unittest.cpp | 38 +++++++++++++++++++++++++
 2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e7e497ea/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 5d877b4..38a773b 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -184,9 +184,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
   for (auto priority_iter = execution_stats_.cbegin();
        priority_iter != execution_stats_.cend();
        ++priority_iter) {
-    sum_priority_levels += priority_iter->second.size();
-    priority_levels.emplace_back(priority_iter->first);
-    numerators.emplace_back(priority_iter->first);
+    const std::size_t curr_priority_level = priority_iter->first;
+    sum_priority_levels += curr_priority_level;
+    priority_levels.emplace_back(curr_priority_level);
+    numerators.emplace_back(curr_priority_level);
   }
   if (sum_priority_levels > 0) {
     probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e7e497ea/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 74353f0..864bb22 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -84,4 +84,42 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 }
 
+TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel1 = 1;
+  const std::size_t kPriorityLevel2 = 2;
+  handle1.reset(new QueryHandle(1, kPriorityLevel1));
+  handle2.reset(new QueryHandle(2, kPriorityLevel2));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  learner.addQuery(*handle1);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  learner.addQuery(*handle2);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  learner.removeQuery(handle2->query_id());
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+
+  learner.removeQuery(handle1->query_id());
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+}
+
 }  // namespace quickstep


[20/21] incubator-quickstep git commit: Add individual query time printing

Posted by hb...@apache.org.
Add individual query time printing

- Query entry/admission and completion times are made part of the query
  handle class.
- Bug fix in PriorityPolicyEnforcer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9664de1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9664de1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9664de1f

Branch: refs/heads/scheduler++
Commit: 9664de1fd6b2a2b2c37106c346899e0024234dc9
Parents: 3fc373d
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 30 15:00:44 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:11 2016 -0500

----------------------------------------------------------------------
 cli/QuickstepCli.cpp                       |  5 +++
 query_execution/Learner.cpp                |  1 +
 query_execution/Learner.hpp                |  3 +-
 query_execution/PriorityPolicyEnforcer.cpp | 11 ++++---
 query_execution/PriorityPolicyEnforcer.hpp |  4 +++
 query_optimizer/QueryHandle.hpp            | 41 +++++++++++++++++++++++++
 6 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9664de1f/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 7ce9502..0aad29f 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -19,6 +19,7 @@
 
 /* A standalone command-line interface to QuickStep */
 
+#include <algorithm>
 #include <chrono>
 #include <cstddef>
 #include <cstdio>
@@ -529,6 +530,10 @@ int main(int argc, char* argv[]) {
               for (std::size_t i = 0; i < query_handles.size(); ++i) {
                 InputParserUtil::PrintAndDropOutputRelation(
                     query_handles[i], query_processor.get());
+                printf("Time: %s ms\n",
+                       quickstep::DoubleToStringWithSignificantDigits(
+                           query_handles[i]->getExecutionTimeMillis(), 3)
+                           .c_str());
               }
               query_processor->saveCatalog();
               if (quickstep::FLAGS_profile_and_report_workorder_perf) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9664de1f/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 9801f60..183d688 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -245,6 +245,7 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
     execution_stats_.erase(priority_level);
     current_probabilities_.erase(priority_level);
     probabilities_of_priority_levels_->removeObject(priority_level);
+    default_probabilities_.erase(priority_level);
     // NOTE(harshad) : Not using this cache as it gets confusing.
     // has_feedback_from_all_queries_.erase(priority_level);
     if (hasActiveQueries()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9664de1f/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index ef92db9..024dc9b 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -276,7 +276,7 @@ class Learner {
     CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
     if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
-      execution_stats_[priority_level];
+      execution_stats_.emplace(priority_level, std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>());
       if (static_cast<int>(priority_level) > highest_priority_level_) {
         highest_priority_level_ = priority_level;
       }
@@ -380,6 +380,7 @@ class Learner {
       const std::size_t priority_level) const {
     // NOTE(harshad) : Not using this cache as it gets confusing.
     // return has_feedback_from_all_queries_.at(priority_level);
+    DCHECK(isPriorityLevelPresent(priority_level));
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
         &stats_vector = execution_stats_.at(priority_level);
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9664de1f/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 0a15094..93908a9 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -43,7 +43,6 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " the workers.");
 
 bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  Learner learner;
   if (admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();
@@ -52,9 +51,11 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       admitted_queries_[query_id].reset(
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
-      LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+      LOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
       priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
+      query_handle->setAdmissionTime();
+      query_id_to_handle_[query_handle->query_id()] = query_handle;
       return true;
     } else {
       LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -63,6 +64,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
   } else {
     // This query will have to wait.
     LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
+    query_id_to_handle_[query_handle->query_id()] = query_handle;
     waiting_queries_.push(query_handle);
     return false;
   }
@@ -224,10 +226,11 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
     // No more queries for the given priority level. Remove the entry.
     priority_query_ids_.erase(query_priority_unsigned);
   }
+  query_id_to_handle_[query_id]->setCompletionTime();
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
-  LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
-  // Admit waiting queries, if any.
+  // TODO(harshad) - Admit waiting queries, if any.
+  LOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
 }
 
 bool PriorityPolicyEnforcer::admitQueries(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9664de1f/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index eafb099..1f13a10 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -218,6 +218,10 @@ class PriorityPolicyEnforcer {
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 
+  // Key = query ID, value = a pointer to the QueryHandle.
+  // Note - This map has entries for active and waiting queries only.
+  std::unordered_map<std::size_t, QueryHandle*> query_id_to_handle_;
+
   // Key = Query ID.
   // Value = A tuple indicating a record of executing a work order.
   // Within a tuple ...

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9664de1f/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 5f3649a..04f672e 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -17,6 +17,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
 
+#include <chrono>
 #include <cstddef>
 #include <cstdint>
 #include <memory>
@@ -119,6 +120,39 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
+  void setEntryTime() {
+    entry_time_ = std::chrono::steady_clock::now();
+  }
+
+  void setAdmissionTime() {
+    admission_time_ = std::chrono::steady_clock::now();
+  }
+
+  void setCompletionTime() {
+    completion_time_ = std::chrono::steady_clock::now();
+  }
+
+  const std::chrono::steady_clock::time_point& getEntryTime() const {
+    return entry_time_;
+  }
+
+  const std::chrono::steady_clock::time_point& getAdmissionTime() const {
+    return admission_time_;
+  }
+
+  const std::chrono::steady_clock::time_point& getCompletionTime() const {
+    return completion_time_;
+  }
+
+  const double getExecutionTimeMillis() const {
+    return std::chrono::duration<double, std::milli>(completion_time_ - admission_time_)
+               .count();
+  }
+
+  const double getWaitingTimeMillis() const {
+    return std::chrono::duration<double, std::milli>(admission_time_ - entry_time_).count();
+  }
+
  private:
   const std::size_t query_id_;
   const std::uint64_t query_priority_;
@@ -134,6 +168,13 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
+  // Time when query entered the system.
+  std::chrono::steady_clock::time_point entry_time_;
+  // Time when query was admitted to the system.
+  std::chrono::steady_clock::time_point admission_time_;
+  // Time when query finished its execution.
+  std::chrono::steady_clock::time_point completion_time_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryHandle);
 };
 


[08/21] incubator-quickstep git commit: Select query from learner in PolicyEnforcer.

Posted by hb...@apache.org.
Select query from learner in PolicyEnforcer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ca34819f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ca34819f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ca34819f

Branch: refs/heads/scheduler++
Commit: ca34819fe331dc885f51a2f60ff0eac10eb16ceb
Parents: 672a193
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 12:25:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp                |  5 +-
 query_execution/Learner.hpp                | 31 +++++----
 query_execution/PriorityPolicyEnforcer.cpp | 86 +++++++++++++++++++++----
 query_execution/PriorityPolicyEnforcer.hpp | 19 ++++++
 4 files changed, 112 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca34819f/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 11c3735..bb24baa 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -49,7 +49,7 @@ void Learner::addCompletionFeedback(
         &workorder_completion_proto) {
   const std::size_t query_id = workorder_completion_proto.query_id();
   DCHECK(isQueryPresent(query_id));
-  const std::size_t priority_level = getQueryPriority(query_id);
+  const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
   ExecutionStats *execution_stats = getExecutionStats(query_id);
   DCHECK(execution_stats != nullptr);
   execution_stats->addEntry(
@@ -106,12 +106,10 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
         query_id,
         1 / static_cast<float>(mean_workorders_per_query[query_id]),
         denominator);
-    // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
   } else {
     // At least one of the queries has predicted time for next work order as 0.
     // In such a case, we don't update the probabilities and continue to use
     // the older probabilities.
-    // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
     return;
   }
 }
@@ -248,7 +246,6 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
     probabilities_of_priority_levels_->removeObject(priority_level);
     // NOTE(harshad) : Not using this cache as it gets confusing.
     // has_feedback_from_all_queries_.erase(priority_level);
-    // LOG(INFO) << "Removed priority level: " << priority_level;
     if (hasActiveQueries()) {
       if (static_cast<int>(priority_level) == highest_priority_level_) {
         // The priority level to be removed is the highest priority level.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca34819f/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 2c6fdef..8634842 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -70,7 +70,7 @@ class Learner {
    **/
   void removeQuery(const std::size_t query_id) {
     DCHECK(isQueryPresent(query_id));
-    const std::size_t priority_level = getQueryPriority(query_id);
+    const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
     // Find the iterator to the query in execution_stats_.
     auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
     execution_stats_[priority_level].erase(stats_iter_mutable);
@@ -158,9 +158,6 @@ class Learner {
     if (hasActiveQueries()) {
       const int result = static_cast<int>(
           probabilities_of_priority_levels_->pickRandomProperty());
-      /*LOG(INFO) << "Random priority level: " << result << " has "
-                << current_probabilities_.find(result)->second->getNumObjects()
-                << " queries";*/
       return result;
     } else {
       return kInvalidPriorityLevel;
@@ -182,10 +179,8 @@ class Learner {
       DCHECK_GT(random_priority_level, 0);
       const int result = pickRandomQueryFromPriorityLevel(
           static_cast<std::size_t>(random_priority_level));
-      // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
       return result;
     } else {
-      // LOG(INFO) << "No active query right now";
       return kInvalidQueryID;
     }
   }
@@ -209,7 +204,6 @@ class Learner {
           return static_cast<int>(
               current_probabilities_.at(priority_level)->pickRandomProperty());
         }
-        // LOG(INFO) << "No queries in priority level: " << priority_level;
       } else {
         DCHECK(default_probabilities_.at(priority_level) != nullptr);
         const auto it = default_probabilities_.find(priority_level);
@@ -217,12 +211,24 @@ class Learner {
           return static_cast<int>(
               default_probabilities_.at(priority_level)->pickRandomProperty());
         }
-        // LOG(INFO) << "No queries in priority level: " << priority_level;
       }
     }
     return kInvalidQueryID;
   }
 
+  /**
+   * @brief Given a query ID, if the query exists in the learner, return its
+   *        priority, otherwise return kInvalidPriorityLevel.
+   **/
+  inline int getQueryPriority(const std::size_t query_id) const {
+    const auto it = query_id_to_priority_lookup_.find(query_id);
+    if (it != query_id_to_priority_lookup_.end()) {
+      return it->second;
+    } else {
+      return kInvalidPriorityLevel;
+    }
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -324,7 +330,7 @@ class Learner {
     if (isQueryPresent(query_id)) {
       const auto stats_iter = getExecutionStatsIterMutable(query_id);
       DCHECK(stats_iter !=
-             std::end(execution_stats_[getQueryPriority(query_id)]));
+             std::end(execution_stats_[getQueryPriorityUnsafe(query_id)]));
       return stats_iter->second.get();
     }
     return nullptr;
@@ -339,7 +345,7 @@ class Learner {
   inline std::vector<
       std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
       getExecutionStatsIterMutable(const std::size_t query_id) {
-    const std::size_t priority_level = getQueryPriority(query_id);
+    const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
         &stats_vector = execution_stats_[priority_level];
     // The following line uses std::find_if to reach to the desired element
@@ -356,8 +362,11 @@ class Learner {
 
   /**
    * @brief Get a query's priority level given its ID.
+   *
+   * @note This version assumes that the given query ID exists in data
+   *       structures.
    **/
-  inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+  inline const std::size_t getQueryPriorityUnsafe(const std::size_t query_id) const {
     const auto it = query_id_to_priority_lookup_.find(query_id);
     DCHECK(it != query_id_to_priority_lookup_.end());
     return it->second;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca34819f/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 44ccb0a..6467367 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -53,6 +53,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
       LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+      priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
       return true;
     } else {
@@ -71,6 +72,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
   // QueryManager, so that we need to extract message from the
   // TaggedMessage only once.
   std::size_t query_id;
+  std::size_t operator_id;
   switch (tagged_message.message_type()) {
     case kWorkOrderCompleteMessage: {
       serialization::NormalWorkOrderCompletionMessage proto;
@@ -79,6 +81,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      operator_id = proto.operator_index();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
       learner_->addCompletionFeedback(proto);
@@ -94,6 +97,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();
+      operator_id = proto.operator_index();
       worker_directory_->decrementNumQueuedWorkOrders(
           proto.worker_thread_index());
       break;
@@ -132,6 +136,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   const QueryManager::QueryStatusCode return_code =
       admitted_queries_[query_id]->processMessage(tagged_message);
+  //NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
   if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
@@ -140,6 +145,8 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
       waiting_queries_.pop();
       admitQuery(new_query);
     }
+  } else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
+    learner_->removeOperator(query_id, operator_id);
   }
 }
 
@@ -161,26 +168,25 @@ void PriorityPolicyEnforcer::getWorkerMessages(
   DCHECK_GT(per_query_share, 0u);
   std::vector<std::size_t> finished_queries_ids;
 
-  for (const auto &admitted_query_info : admitted_queries_) {
-    QueryManager *curr_query_manager = admitted_query_info.second.get();
-    DCHECK(curr_query_manager != nullptr);
-    std::size_t messages_collected_curr_query = 0;
-    while (messages_collected_curr_query < per_query_share) {
+  if (learner_->hasActiveQueries()) {
+    // Key = priority level. Value = Whether we have already checked the
+    std::unordered_map<std::size_t, bool> checked_priority_levels;
+    // While there are more priority levels to be checked ..
+    while (checked_priority_levels.size() < priority_query_ids_.size()) {
+      const int chosen_priority_level = learner_->pickRandomPriorityLevel();
+      DCHECK(chosen_priority_level != kInvalidPriorityLevel);
       WorkerMessage *next_worker_message =
-          curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+          getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
+                                                &finished_queries_ids);
       if (next_worker_message != nullptr) {
-        ++messages_collected_curr_query;
         worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
       } else {
-        // No more work ordes from the current query at this time.
-        // Check if the query's execution is over.
-        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          // If the query has been executed, remove it.
-          finished_queries_ids.push_back(admitted_query_info.first);
-        }
-        break;
+        checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
       }
     }
+  } else {
+    LOG(INFO) << "No active queries in the learner at this point.";
+    return;
   }
   for (const std::size_t finished_qid : finished_queries_ids) {
     removeQuery(finished_qid);
@@ -194,6 +200,22 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
                  << " that hasn't finished its execution";
   }
   admitted_queries_.erase(query_id);
+  // Remove the query from priority_query_ids_ structure.
+  const int query_priority = learner_->getQueryPriority(query_id);
+  DCHECK(query_priority != kInvalidPriorityLevel);
+  const std::size_t query_priority_unsigned =
+      static_cast<std::size_t>(query_priority);
+  std::vector<std::size_t> *query_ids_for_priority_level =
+      &priority_query_ids_[query_priority_unsigned];
+  query_ids_for_priority_level->erase(
+      std::remove(query_ids_for_priority_level->begin(),
+                  query_ids_for_priority_level->end(),
+                  query_id));
+  if (query_ids_for_priority_level->empty()) {
+    // No more queries for the given priority level. Remove the entry.
+    priority_query_ids_.erase(query_priority_unsigned);
+  }
+  // Remove the query from the learner.
   learner_->removeQuery(query_id);
 }
 
@@ -219,4 +241,40 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
       proto.execution_time_in_microseconds());
 }
 
+WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
+    const std::size_t priority_level,
+    std::vector<std::size_t> *finished_queries_ids) {
+  // Key = query ID from the given priority level, value = whether we have
+  // checked this query earlier.
+  std::unordered_map<std::size_t, bool> checked_query_ids;
+  // While there are more queries to be checked ..
+  while (checked_query_ids.size() < priority_query_ids_[priority_level].size()) {
+    const int chosen_query_id = learner_->pickRandomQueryFromPriorityLevel(priority_level);
+    if (chosen_query_id == kInvalidQueryID) {
+      // No query available at this time in this priority level.
+      return nullptr;
+    } else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
+      // We have already seen this query ID, try one more time.
+      LOG(INFO) << "We have already seen this query, continue";
+      continue;
+    } else {
+      // We haven't seen this query earlier. Check if it has any schedulable
+      // WorkOrder.
+      QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(chosen_query_id)].get();
+      DCHECK(chosen_query_manager != nullptr);
+      std::unique_ptr<WorkerMessage> next_worker_message(chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+      if (next_worker_message != nullptr) {
+        return next_worker_message.release();
+      } else {
+        // This query doesn't have any WorkerMessage right now. Mark as checked.
+        checked_query_ids[chosen_query_id] = true;
+        if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+        }
+      }
+    }
+  }
+  return nullptr;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ca34819f/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 94cbe38..281c066 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -183,6 +183,21 @@ class PriorityPolicyEnforcer {
   void recordTimeForWorkOrder(
       const serialization::NormalWorkOrderCompletionMessage &proto);
 
+  /**
+   * @brief get a WorkerMessage from the chosen priority level.
+   *
+   * @param priority_level The priority level from which the query will be
+   *        chosen.
+   * @param finished_query_ids A vector of query IDs that have finished their
+   *        execution.
+   *
+   * @return A WorkerMessage. If no query can be chosen from this priority level,
+   *         return NULL.
+   **/
+  WorkerMessage *getNextWorkerMessageFromPriorityLevel(
+      const std::size_t priority_level,
+      std::vector<std::size_t> *finished_queries_ids);
+
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;
 
@@ -193,6 +208,10 @@ class PriorityPolicyEnforcer {
   tmb::MessageBus *bus_;
   const bool profile_individual_workorders_;
 
+  // Key = priority level, value = a vector of IDs of the queries belonging to
+  // the key priority level.
+  std::unordered_map<std::size_t, std::vector<std::size_t>> priority_query_ids_;
+
   // Key = query ID, value = QueryManager* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
 


[18/21] incubator-quickstep git commit: Cap on number of work orders per dispatch round enforced

Posted by hb...@apache.org.
Cap on number of work orders per dispatch round enforced


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/557ac9a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/557ac9a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/557ac9a3

Branch: refs/heads/scheduler++
Commit: 557ac9a3389214d28d82124835cb64facf421813
Parents: de19aeb
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jul 5 10:42:34 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:11 2016 -0500

----------------------------------------------------------------------
 query_execution/Foreman.cpp                |  3 ++-
 query_execution/PriorityPolicyEnforcer.cpp | 14 ++------------
 2 files changed, 4 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/557ac9a3/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 0898ac1..9824ca7 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -46,7 +46,7 @@ using std::vector;
 
 namespace quickstep {
 
-DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
+DEFINE_uint64(min_load_per_worker, 1, "The minimum load defined as the number "
               "of pending work orders for the worker. This information is used "
               "by the Foreman to assign work orders to worker threads");
 
@@ -154,6 +154,7 @@ void Foreman::run() {
     if (canCollectNewMessages(message_type)) {
       vector<unique_ptr<WorkerMessage>> new_messages;
       policy_enforcer_->getWorkerMessages(&new_messages);
+      std::cout << "# Messages: " << new_messages.size() << "\n";
       dispatchWorkerMessages(new_messages);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/557ac9a3/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 84082ed..7d0d46f 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -38,7 +38,7 @@
 
 namespace quickstep {
 
-DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+DEFINE_uint64(max_msgs_per_dispatch_round, 80, "Maximum number of messages that"
               " can be allocated in a single round of dispatch of messages to"
               " the workers.");
 
@@ -160,23 +160,13 @@ void PriorityPolicyEnforcer::getWorkerMessages(
   // messages available, or the maximum number of messages have
   // been collected.
   DCHECK(worker_messages->empty());
-  // TODO(harshad) - Make this function generic enough so that it
-  // works well when multiple queries are getting executed.
-  std::size_t per_query_share = 0;
-  if (!admitted_queries_.empty()) {
-    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
-  } else {
-    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
-    return;
-  }
-  DCHECK_GT(per_query_share, 0u);
   std::unordered_map<std::size_t, bool> finished_queries_ids;
 
   if (learner_->hasActiveQueries()) {
     // Key = priority level. Value = Whether we have already checked the
     std::unordered_map<std::size_t, bool> checked_priority_levels;
     // While there are more priority levels to be checked ..
-    while (checked_priority_levels.size() < priority_query_ids_.size()) {
+    while (checked_priority_levels.size() < priority_query_ids_.size() && worker_messages->size() < FLAGS_max_msgs_per_dispatch_round) {
       const int chosen_priority_level = learner_->pickRandomPriorityLevel();
       if (chosen_priority_level == kInvalidPriorityLevel) {
         DLOG(INFO) << "No valid priority level chosen";


[15/21] incubator-quickstep git commit: CLI support to admit a workload.

Posted by hb...@apache.org.
CLI support to admit a workload.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3fc373dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3fc373dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3fc373dc

Branch: refs/heads/scheduler++
Commit: 3fc373dc03f7cc511405c5e1a0d7de97f3882b61
Parents: fb05e6e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jun 29 15:59:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:11 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt                             |   2 -
 cli/CMakeLists.txt                         |   4 +
 cli/InputParserUtil.cpp                    |  30 +++
 cli/InputParserUtil.hpp                    |   6 +
 cli/QuickstepCli.cpp                       | 280 +++++++++++++++---------
 query_execution/CMakeLists.txt             |   1 +
 query_execution/Learner.cpp                |  11 +-
 query_execution/Learner.hpp                |   2 -
 query_execution/PriorityPolicyEnforcer.cpp |  38 +++-
 query_execution/PriorityPolicyEnforcer.hpp |   6 +-
 query_execution/ProbabilityStore.hpp       |   2 +-
 11 files changed, 262 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 20e1fb9..9d3c413 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -741,10 +741,8 @@ target_link_libraries(quickstep_cli_shell
                       quickstep_catalog_CatalogRelation
                       quickstep_cli_CommandExecutor
                       quickstep_cli_DefaultsConfigurator
-                      quickstep_cli_DropRelation
                       quickstep_cli_InputParserUtil
                       quickstep_cli_LineReader
-                      quickstep_cli_PrintToScreen
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryexecution_AdmitRequestMessage

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 44ec223..a85d52c 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -113,6 +113,10 @@ if(QUICKSTEP_HAVE_LIBNUMA)
 endif()
 target_link_libraries(quickstep_cli_InputParserUtil
                       glog
+                      quickstep_cli_DropRelation
+                      quickstep_cli_PrintToScreen
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_queryoptimizer_QueryProcessor
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 if(QUICKSTEP_HAVE_LIBNUMA)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/cli/InputParserUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index 352883e..ffc997c 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -24,6 +24,10 @@
 #include <vector>
 
 #include "catalog/CatalogConfig.h"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageConfig.h"
 #include "utility/StringUtil.hpp"
 
@@ -36,6 +40,12 @@
 using std::string;
 
 namespace quickstep {
+  class CatalogRelation;
+  class CatalogDatabase;
+  class StorageManager;
+}
+
+namespace quickstep {
 
 std::vector<int> InputParserUtil::ParseWorkerAffinities(
     const int num_workers,
@@ -87,4 +97,24 @@ std::vector<int> InputParserUtil::GetNUMANodesForCPUs() {
   return numa_nodes_of_cpus;
 }
 
+void InputParserUtil::PrintAndDropOutputRelation(
+    QueryHandle *query_handle, QueryProcessor *query_processor) {
+  const CatalogRelation *query_result_relation =
+      query_handle->getQueryResultRelation();
+  if (query_result_relation != nullptr) {
+    PrintToScreen::PrintRelation(*query_result_relation,
+                                 query_processor->getStorageManager(),
+                                 stdout);
+    PrintToScreen::PrintOutputSize(
+        *query_result_relation,
+        query_processor->getStorageManager(),
+        stdout);
+
+    DropRelation::Drop(*query_result_relation,
+                       query_processor->getDefaultDatabase(),
+                       query_processor->getStorageManager());
+  }
+
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/cli/InputParserUtil.hpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.hpp b/cli/InputParserUtil.hpp
index ebb32d2..42e9804 100644
--- a/cli/InputParserUtil.hpp
+++ b/cli/InputParserUtil.hpp
@@ -24,6 +24,9 @@
 
 namespace quickstep {
 
+class QueryHandle;
+class QueryProcessor;
+
 /** \addtogroup CLI
  *  @{
  */
@@ -60,6 +63,9 @@ class InputParserUtil {
    **/
   static std::vector<int> GetNUMANodesForCPUs();
 
+  static void PrintAndDropOutputRelation(QueryHandle *query_handle,
+                                         QueryProcessor *query_processor);
+
  private:
   /**
    * @brief Private constructor to disable instantiation of the class.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index c34e389..7ce9502 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -161,6 +161,8 @@ DEFINE_bool(initialize_db, false, "If true, initialize a database.");
 DEFINE_bool(print_query, false,
             "Print each input query statement. This is useful when running a "
             "large number of queries in a batch.");
+DEFINE_bool(accept_workload, false, "If true, accept a workload through CLI, "
+            "otherwise execute one query at a time");
 DEFINE_string(profile_file_name, "",
               "If nonempty, enable profiling using GOOGLE CPU Profiler, and write "
               "its output to the given file name. This flag has no effect if "
@@ -377,129 +379,209 @@ int main(int argc, char* argv[]) {
 #ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
   bool started_profiling = false;
 #endif
-  for (;;) {
-    string *command_string = new string();
-    *command_string = line_reader.getNextCommand();
-    if (command_string->size() == 0) {
-      delete command_string;
-      break;
-    }
+  if (!quickstep::FLAGS_accept_workload) {
+    for (;;) {
+      string *command_string = new string();
+      *command_string = line_reader.getNextCommand();
+      if (command_string->size() == 0) {
+        delete command_string;
+        break;
+      }
 
-    if (quickstep::FLAGS_print_query) {
-      printf("\n%s\n", command_string->c_str());
-    }
+      if (quickstep::FLAGS_print_query) {
+        printf("\n%s\n", command_string->c_str());
+      }
 
-    parser_wrapper->feedNextBuffer(command_string);
+      parser_wrapper->feedNextBuffer(command_string);
+
+      bool quitting = false;
+      // A parse error should reset the parser. This is because the thrown quickstep
+      // SqlError does not do the proper reset work of the YYABORT macro.
+      bool reset_parser = false;
+      for (;;) {
+        ParseResult result = parser_wrapper->getNextStatement();
+        if (result.condition == ParseResult::kSuccess) {
+          if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
+            quitting = true;
+            break;
+          }
 
-    bool quitting = false;
-    // A parse error should reset the parser. This is because the thrown quickstep
-    // SqlError does not do the proper reset work of the YYABORT macro.
-    bool reset_parser = false;
-    for (;;) {
-      ParseResult result = parser_wrapper->getNextStatement();
-      if (result.condition == ParseResult::kSuccess) {
-        if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
-          quitting = true;
-          break;
-        }
+          if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+            try {
+              quickstep::cli::executeCommand(
+                  *result.parsed_statement,
+                  *(query_processor->getDefaultDatabase()),
+                  main_thread_client_id,
+                  foreman.getBusClientID(),
+                  &bus,
+                  query_processor->getStorageManager(),
+                  query_processor.get(),
+                  stdout);
+            } catch (const quickstep::SqlError &sql_error) {
+              fprintf(stderr, "%s",
+                      sql_error.formatMessage(*command_string).c_str());
+              reset_parser = true;
+              break;
+            }
+          continue;
+          }
 
-        if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+          std::unique_ptr<QueryHandle> query_handle;
           try {
-            quickstep::cli::executeCommand(
-                *result.parsed_statement,
-                *(query_processor->getDefaultDatabase()),
-                main_thread_client_id,
-                foreman.getBusClientID(),
-                &bus,
-                query_processor->getStorageManager(),
-                query_processor.get(),
-                stdout);
+            query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
           } catch (const quickstep::SqlError &sql_error) {
-            fprintf(stderr, "%s",
-                    sql_error.formatMessage(*command_string).c_str());
+            fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
             reset_parser = true;
             break;
           }
-        continue;
-        }
 
-        std::unique_ptr<QueryHandle> query_handle;
-        try {
-          query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
-        } catch (const quickstep::SqlError &sql_error) {
-          fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+          DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+          std::vector<QueryHandle*> query_handles;
+          query_handles.push_back(query_handle.get());
+          start = std::chrono::steady_clock::now();
+          QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+              main_thread_client_id,
+              foreman.getBusClientID(),
+              &query_handles,
+              &bus);
+
+          try {
+            QueryExecutionUtil::ReceiveQueryCompletionMessage(
+                main_thread_client_id, &bus);
+            end = std::chrono::steady_clock::now();
+
+            InputParserUtil::PrintAndDropOutputRelation(query_handle.get(), query_processor.get());
+            query_processor->saveCatalog();
+            std::chrono::duration<double, std::milli> time_ms = end - start;
+            printf("Time: %s ms\n",
+                   quickstep::DoubleToStringWithSignificantDigits(
+                       time_ms.count(), 3).c_str());
+            if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+              // TODO(harshad) - Allow user specified file instead of stdout.
+              foreman.printWorkOrderProfilingResults(query_handle->query_id(),
+                                                     stdout);
+            }
+          } catch (const std::exception &e) {
+            fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+            break;
+          }
+        } else {
+          if (result.condition == ParseResult::kError) {
+            fprintf(stderr, "%s", result.error_message.c_str());
+          }
           reset_parser = true;
           break;
         }
+#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
+        // Profile only if profile_file_name flag is set
+        if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
+          started_profiling = true;
+          ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+        }
+#endif
+      }
 
-        DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-        std::vector<QueryHandle*> query_handles;
-        query_handles.push_back(query_handle.get());
-        start = std::chrono::steady_clock::now();
-        QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-            main_thread_client_id,
-            foreman.getBusClientID(),
-            &query_handles,
-            &bus);
-
-        try {
-          QueryExecutionUtil::ReceiveQueryCompletionMessage(
-              main_thread_client_id, &bus);
-          end = std::chrono::steady_clock::now();
-
-          const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
-          if (query_result_relation) {
-            PrintToScreen::PrintRelation(*query_result_relation,
-                                         query_processor->getStorageManager(),
-                                         stdout);
-            PrintToScreen::PrintOutputSize(
-                *query_result_relation,
-                query_processor->getStorageManager(),
-                stdout);
-
-            DropRelation::Drop(*query_result_relation,
-                               query_processor->getDefaultDatabase(),
-                               query_processor->getStorageManager());
+      if (quitting) {
+        break;
+      } else if (reset_parser) {
+        parser_wrapper.reset(new SqlParserWrapper());
+        reset_parser = false;
+      }
+    }
+  } else {
+    std::vector<QueryHandle*> query_handles;
+    for (;;) {
+      bool end_of_input = false;
+      // A parse error should reset the parser. This is because the thrown quickstep
+      // SqlError does not do the proper reset work of the YYABORT macro.
+      bool reset_parser = false;
+      string *command_string = new string();
+      *command_string = line_reader.getNextCommand();
+      if (command_string->size() == 0) {
+        delete command_string;
+        end_of_input = true;
+        reset_parser = true;
+      } else {
+        if (quickstep::FLAGS_print_query) {
+          printf("\n%s\n", command_string->c_str());
+        }
+        parser_wrapper->feedNextBuffer(command_string);
+        end_of_input = false;
+        reset_parser = false;
+      }
+      for (;;) {
+        ParseResult result = parser_wrapper->getNextStatement();
+        // Check if the input has ended.
+        if (end_of_input && (result.condition == ParseResult::kEndOfInput ||
+                             (result.condition == ParseResult::kSuccess &&
+                              result.parsed_statement->getStatementType() ==
+                                  ParseStatement::kQuit))) {
+          if (!query_handles.empty()) {
+            QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+                main_thread_client_id,
+                foreman.getBusClientID(),
+                &query_handles,
+                &bus);
+            try {
+              QueryExecutionUtil::ReceiveQueryCompletionMessage(
+                  main_thread_client_id, &bus);
+
+              for (std::size_t i = 0; i < query_handles.size(); ++i) {
+                InputParserUtil::PrintAndDropOutputRelation(
+                    query_handles[i], query_processor.get());
+              }
+              query_processor->saveCatalog();
+              if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+                // TODO(harshad) - Allow user specified file instead of stdout.
+                for (std::size_t i = 0; i < query_handles.size(); ++i) {
+                  foreman.printWorkOrderProfilingResults(
+                      query_handles[i]->query_id(), stdout);
+                }
+              }
+            } catch (const std::exception &e) {
+              fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+              break;
+            }
           }
-
-          query_processor->saveCatalog();
-          std::chrono::duration<double, std::milli> time_ms = end - start;
-          printf("Time: %s ms\n",
-                 quickstep::DoubleToStringWithSignificantDigits(
-                     time_ms.count(), 3).c_str());
-          if (quickstep::FLAGS_profile_and_report_workorder_perf) {
-            // TODO(harshad) - Allow user specified file instead of stdout.
-            foreman.printWorkOrderProfilingResults(query_handle->query_id(),
-                                                   stdout);
+          reset_parser = true;
+          break;
+        } else if (result.condition == ParseResult::kSuccess) {
+          if (result.parsed_statement->getStatementType() == ParseStatement::kSelect) {
+            std::unique_ptr<QueryHandle> query_handle;
+            try {
+              query_handle.reset(query_processor->generateQueryHandle(
+                  *result.parsed_statement));
+            } catch (const quickstep::SqlError &sql_error) {
+              fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+              reset_parser = true;
+              break;
+            }
+
+            DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+            query_handles.push_back(query_handle.release());
+          } else {
+            LOG(INFO) << "Only select queries are accepted in the workload";
           }
-        } catch (const std::exception &e) {
-          fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
           break;
-        }
-      } else {
-        if (result.condition == ParseResult::kError) {
+        } else if (result.condition == ParseResult::kError) {
           fprintf(stderr, "%s", result.error_message.c_str());
+          reset_parser = true;
+          break;
+        } else {
+          LOG(FATAL) << "Unhandled case";
+          break;
         }
-        reset_parser = true;
-        break;
       }
-#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
-      // Profile only if profile_file_name flag is set
-      if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
-        started_profiling = true;
-        ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+      if (end_of_input) {
+        break;
+      } else if (reset_parser) {
+        parser_wrapper.reset(new SqlParserWrapper());
+        reset_parser = false;
       }
-#endif
-    }
-
-    if (quitting) {
-      break;
-    } else if (reset_parser) {
-      parser_wrapper.reset(new SqlParserWrapper());
-      reset_parser = false;
     }
   }
 
+
 #ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
   if (started_profiling) {
     ProfilerStop();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 104f9da..13b74e3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -242,6 +242,7 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_Learner
                       quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_PriorityPolicyEnforcer
+                      quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryexecution_QueryExecutionMessages_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index bb24baa..9801f60 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -101,11 +101,12 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     // queries.
     DCHECK(mean_workorders_per_query.find(query_id) !=
            mean_workorders_per_query.end());
-    DCHECK_NE(mean_workorders_per_query[query_id], 0);
-    current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
-        query_id,
-        1 / static_cast<float>(mean_workorders_per_query[query_id]),
-        denominator);
+    if (mean_workorders_per_query[query_id] != 0) {
+      current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+          query_id,
+          1 / static_cast<float>(mean_workorders_per_query[query_id]),
+          denominator);
+    }
   } else {
     // At least one of the queries has predicted time for next work order as 0.
     // In such a case, we don't update the probabilities and continue to use

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 8634842..ef92db9 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -451,8 +451,6 @@ class Learner {
     for (const auto &element : mean_workorder_per_query) {
       if (element.second != 0) {
         denominator += 1/static_cast<float>(element.second);
-      /*} else {
-        return 0;*/
       }
     }
     return denominator;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index f9a741d..0a15094 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -62,6 +62,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
     }
   } else {
     // This query will have to wait.
+    LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
     waiting_queries_.push(query_handle);
     return false;
   }
@@ -166,7 +167,7 @@ void PriorityPolicyEnforcer::getWorkerMessages(
     return;
   }
   DCHECK_GT(per_query_share, 0u);
-  std::vector<std::size_t> finished_queries_ids;
+  std::unordered_map<std::size_t, bool> finished_queries_ids;
 
   if (learner_->hasActiveQueries()) {
     // Key = priority level. Value = Whether we have already checked the
@@ -196,8 +197,8 @@ void PriorityPolicyEnforcer::getWorkerMessages(
     DLOG(INFO) << "No active queries in the learner at this point.";
     return;
   }
-  for (const std::size_t finished_qid : finished_queries_ids) {
-    removeQuery(finished_qid);
+  for (auto finished_qid_pair : finished_queries_ids) {
+    removeQuery(finished_qid_pair.first);
   }
 }
 
@@ -225,6 +226,8 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
   }
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
+  LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
+  // Admit waiting queries, if any.
 }
 
 bool PriorityPolicyEnforcer::admitQueries(
@@ -251,7 +254,7 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
 
 WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
     const std::size_t priority_level,
-    std::vector<std::size_t> *finished_queries_ids) {
+    std::unordered_map<std::size_t, bool> *finished_queries_ids) {
   // Key = query ID from the given priority level, value = whether we have
   // checked this query earlier.
   std::unordered_map<std::size_t, bool> checked_query_ids;
@@ -262,9 +265,28 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
       // No query available at this time in this priority level.
       return nullptr;
     } else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
-      // We have already seen this query ID, try one more time.
-      LOG(INFO) << "We have already seen this query, continue";
-      continue;
+      // Find a query from the same priority level, but not present in the
+      // checked_query_ids map.
+      for (const std::size_t qid : priority_query_ids_[priority_level]) {
+        if (checked_query_ids.find(qid) == checked_query_ids.end() &&
+            finished_queries_ids->find(qid) == finished_queries_ids->end()) {
+          // Query not seen already.
+          QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(qid)].get();
+          DCHECK(chosen_query_manager != nullptr);
+          std::unique_ptr<WorkerMessage> next_worker_message(
+              chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+          if (next_worker_message != nullptr) {
+            // LOG(INFO) << "Selecting a work order from query " << qid << " instead";
+            return next_worker_message.release();
+          } else {
+            // This query doesn't have any WorkerMessage right now. Mark as checked.
+            checked_query_ids[qid] = true;
+            if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+              (*finished_queries_ids)[static_cast<std::size_t>(qid)] = true;
+            }
+          }
+        }
+      }
     } else {
       // We haven't seen this query earlier. Check if it has any schedulable
       // WorkOrder.
@@ -277,7 +299,7 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
         // This query doesn't have any WorkerMessage right now. Mark as checked.
         checked_query_ids[chosen_query_id] = true;
         if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+          (*finished_queries_ids)[static_cast<std::size_t>(chosen_query_id)] = true;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 281c066..eafb099 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -170,7 +170,7 @@ class PriorityPolicyEnforcer {
   }
 
  private:
-  static constexpr std::size_t kMaxConcurrentQueries = 2;
+  static constexpr std::size_t kMaxConcurrentQueries = 100;
 
   /**
    * @brief Record the execution time for a finished WorkOrder.
@@ -188,7 +188,7 @@ class PriorityPolicyEnforcer {
    *
    * @param priority_level The priority level from which the query will be
    *        chosen.
-   * @param finished_query_ids A vector of query IDs that have finished their
+   * @param finished_query_ids A map of query IDs that have finished their
    *        execution.
    *
    * @return A WorkerMessage. If no query can be chosen from this priority level,
@@ -196,7 +196,7 @@ class PriorityPolicyEnforcer {
    **/
   WorkerMessage *getNextWorkerMessageFromPriorityLevel(
       const std::size_t priority_level,
-      std::vector<std::size_t> *finished_queries_ids);
+      std::unordered_map<std::size_t, bool> *finished_queries_ids);
 
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc373dc/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 7278e2b..4ae085e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -286,7 +286,7 @@ class ProbabilityStore {
     for (auto it = individual_probabilities_.begin();
          it != individual_probabilities_.end();
          ++it) {
-      DCHECK_LE(it->second.first, common_denominator_);
+      // DCHECK_LE(it->second.first, common_denominator_);
       it->second.second = it->second.first / common_denominator_;
     }
     updateCumulativeProbabilities();


[16/21] incubator-quickstep git commit: Function to log work order execution statistics.

Posted by hb...@apache.org.
Function to log work order execution statistics.

- Change log severity levels.
- Minor style fixes.
- Minor logging message added.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/de19aeb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/de19aeb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/de19aeb6

Branch: refs/heads/scheduler++
Commit: de19aeb61f47e32fa1f2efe89b4b5752773314b9
Parents: 9ec8910
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jul 1 11:38:18 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:11 2016 -0500

----------------------------------------------------------------------
 cli/InputParserUtil.cpp                         |  7 ++---
 query_execution/CMakeLists.txt                  |  2 --
 query_execution/Learner.cpp                     | 29 +++++++++++++++++---
 query_execution/Learner.hpp                     |  3 ++
 query_execution/PriorityPolicyEnforcer.cpp      | 13 ++++-----
 query_execution/QueryExecutionMessages.proto    |  6 ++++
 query_execution/QueryExecutionUtil.hpp          |  1 +
 query_execution/Worker.cpp                      |  5 ++++
 .../tests/ExecutionGeneratorTestRunner.cpp      |  1 +
 9 files changed, 50 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/cli/InputParserUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index ffc997c..74a2188 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -40,9 +40,9 @@
 using std::string;
 
 namespace quickstep {
-  class CatalogRelation;
-  class CatalogDatabase;
-  class StorageManager;
+class CatalogRelation;
+class CatalogDatabase;
+class StorageManager;
 }
 
 namespace quickstep {
@@ -114,7 +114,6 @@ void InputParserUtil::PrintAndDropOutputRelation(
                        query_processor->getDefaultDatabase(),
                        query_processor->getStorageManager());
   }
-
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 13b74e3..9ab86b2 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -107,7 +107,6 @@ target_link_libraries(quickstep_queryexecution_Learner
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
-                      quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryExecutionMessages_proto
@@ -122,7 +121,6 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
 target_link_libraries(quickstep_queryexecution_PriorityPolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
-                      quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_Learner
                       quickstep_queryexecution_ProbabilityStore

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 3c6f42b..56e9626 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstddef>
+#include <string>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -42,6 +43,8 @@ DEFINE_uint64(max_past_entries_learner,
 Learner::Learner()
     : highest_priority_level_(kInvalidPriorityLevel) {
   probabilities_of_priority_levels_.reset(new ProbabilityStore());
+  // Format: Query ID, Operator ID, Worker ID, Time in micros, WO execution end timestamp.
+  LOG(INFO) << "Query ID|Operator ID|Worker ID|Time in microseconds|Workorder end timestamp";
 }
 
 void Learner::addCompletionFeedback(
@@ -61,16 +64,17 @@ void Learner::addCompletionFeedback(
   }
   updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
   updateProbabilitiesOfAllPriorityLevels();
-  printProbabilitiesForPriorityLevel(priority_level);
+  // printProbabilitiesForPriorityLevel(priority_level);
+  printWorkOrderDetails(workorder_completion_proto);
 }
 
 void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     const std::size_t priority_level, const std::size_t query_id) {
   DCHECK(isPriorityLevelPresent(priority_level));
   if (execution_stats_[priority_level].empty()) {
-    LOG(INFO) << "Updating probabilities for query ID: " << query_id
-              << " and priority level: " << priority_level
-              << " that has no queries";
+    LOG(WARNING) << "Updating probabilities for query ID: " << query_id
+                 << " and priority level: " << priority_level
+                 << " that has no queries";
     return;
   } else if (execution_stats_[priority_level].size() == 1u) {
     DCHECK(current_probabilities_[priority_level] != nullptr);
@@ -286,4 +290,21 @@ void Learner::printProbabilitiesForPriorityLevel(const std::size_t priority_leve
   }
 }
 
+void Learner::printWorkOrderDetails(
+    const serialization::NormalWorkOrderCompletionMessage &proto) const {
+  // Format: Query ID, Operator ID, Worker ID, Time in micros, WO execution end timestamp.
+  std::string result = "";
+  result.reserve(30);
+  result += std::to_string(proto.query_id());  // 2 chars
+  result += "|";
+  result += std::to_string(proto.operator_index());  // 2 chars
+  result += "|";
+  result += std::to_string(proto.worker_thread_index());  // 2 chars
+  result += "|";
+  result += std::to_string(proto.execution_time_in_microseconds());  // 5 chars
+  result += "|";
+  result += std::to_string(proto.execution_end_timestamp());  // 12 chars
+  LOG(INFO) << result;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 6cea325..8654544 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -472,6 +472,9 @@ class Learner {
 
   void printProbabilitiesForPriorityLevel(const std::size_t priority_level);
 
+  void printWorkOrderDetails(
+      const serialization::NormalWorkOrderCompletionMessage &proto) const;
+
   // Key = Priority level, value = A vector of pairs.
   // Each pair:
   // 1st element: Query ID.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 93908a9..84082ed 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -51,7 +51,8 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       admitted_queries_[query_id].reset(
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
-      LOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
+      DLOG(INFO) << "Admitted query with ID: " << query_handle->query_id()
+                 << " priority: " << query_handle->query_priority();
       priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
       query_handle->setAdmissionTime();
@@ -63,7 +64,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
     }
   } else {
     // This query will have to wait.
-    LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
+    DLOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
     query_id_to_handle_[query_handle->query_id()] = query_handle;
     waiting_queries_.push(query_handle);
     return false;
@@ -139,7 +140,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   const QueryManager::QueryStatusCode return_code =
       admitted_queries_[query_id]->processMessage(tagged_message);
-  //NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
+  // NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
   if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
@@ -178,11 +179,10 @@ void PriorityPolicyEnforcer::getWorkerMessages(
     while (checked_priority_levels.size() < priority_query_ids_.size()) {
       const int chosen_priority_level = learner_->pickRandomPriorityLevel();
       if (chosen_priority_level == kInvalidPriorityLevel) {
-        LOG(INFO) << "No valid priority level chosen";
+        DLOG(INFO) << "No valid priority level chosen";
         break;
       } else if (checked_priority_levels.find(static_cast<std::size_t>(
                      chosen_priority_level)) != checked_priority_levels.end()) {
-        DLOG(INFO) << "The chosen priority level " << chosen_priority_level << " was checked already";
         continue;
       } else {
         WorkerMessage *next_worker_message =
@@ -230,7 +230,7 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
   // TODO(harshad) - Admit waiting queries, if any.
-  LOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
+  DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
 }
 
 bool PriorityPolicyEnforcer::admitQueries(
@@ -279,7 +279,6 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
           std::unique_ptr<WorkerMessage> next_worker_message(
               chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
           if (next_worker_message != nullptr) {
-            // LOG(INFO) << "Selecting a work order from query " << qid << " instead";
             return next_worker_message.release();
           } else {
             // This query doesn't have any WorkerMessage right now. Mark as checked.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 65a8946..db35ecc 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -39,6 +39,8 @@ message NormalWorkOrderCompletionMessage {
   required uint64 worker_thread_index = 2;
   required uint64 query_id = 3;
   optional uint64 execution_time_in_microseconds = 4;
+  // Indicates time elapsed in microseconds since epoch.  
+  optional uint64 execution_end_timestamp = 5;  
 }
 
 // A message sent upon completion of a rebuild WorkOrder execution.
@@ -47,6 +49,10 @@ message RebuildWorkOrderCompletionMessage {
   required uint64 worker_thread_index = 2;
   required uint64 query_id = 3;
   optional uint64 execution_time_in_microseconds = 4;
+  optional uint64 execution_end_timestamp = 5;  // Indicates time elapsed in 
+                                                // microseconds since epoch, 
+                                                // when the work order execution
+                                                // got over. 
 }
 
 message CatalogRelationNewBlockMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 2b95f1f..95ebc8c 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 6ba27f1..6a1cfc0 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -123,11 +123,16 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
   const uint64_t execution_time_microseconds =
       std::chrono::duration_cast<std::chrono::microseconds>(end - start)
           .count();
+  const uint64_t execution_end_timestamp =
+      std::chrono::duration_cast<std::chrono::microseconds>(
+          end.time_since_epoch())
+          .count();
   // Construct the proto message.
   proto->set_operator_index(worker_message.getRelationalOpIndex());
   proto->set_query_id(query_id_for_workorder);
   proto->set_worker_thread_index(worker_thread_index_);
   proto->set_execution_time_in_microseconds(execution_time_microseconds);
+  proto->set_execution_end_timestamp(execution_end_timestamp);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/de19aeb6/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 92baf19..0fef398 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -20,6 +20,7 @@
 #include <cstdio>
 #include <set>
 #include <string>
+#include <vector>
 
 #include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"


[03/21] incubator-quickstep git commit: Added GFLAG to learner.

Posted by hb...@apache.org.
Added GFLAG to learner.

- To control the number of number of work order execution statistics
  that are maintained in the Learner, for a given query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/78830e53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/78830e53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/78830e53

Branch: refs/heads/scheduler++
Commit: 78830e53ba8f71204b000b87e6c141892fd81e20
Parents: e7e497e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 11:39:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp                | 25 +++++++-
 query_execution/Learner.hpp                | 76 ++++++++++++-------------
 query_execution/tests/Learner_unittest.cpp | 17 +++---
 3 files changed, 69 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/78830e53/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 38a773b..0f17e7a 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -34,6 +34,11 @@
 
 namespace quickstep {
 
+DEFINE_uint64(max_past_entries_learner,
+              10,
+              "The maximum number of past WorkOrder execution statistics"
+              " entries for a query");
+
 void Learner::addCompletionFeedback(
     const serialization::NormalWorkOrderCompletionMessage
         &workorder_completion_proto) {
@@ -90,8 +95,7 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
   }
 }
 
-void Learner::updateProbabilitiesOfAllPriorityLevels(
-    const std::size_t priority_level) {
+void Learner::updateProbabilitiesOfAllPriorityLevels() {
   if (!hasFeedbackFromAllPriorityLevels() ||
       has_feedback_from_all_queries_.empty()) {
     // Either we don't have enough feedback messages from all the priority
@@ -114,7 +118,7 @@ void Learner::updateProbabilitiesOfAllPriorityLevels(
       total_time_curr_level += mean_workorder_entry.second;
     }
     const std::size_t num_queries_in_priority_level =
-        execution_stats_[priority_level].size();
+        execution_stats_[curr_priority_level].size();
     DCHECK_GT(num_queries_in_priority_level, 0u);
     predicted_time_for_level[curr_priority_level] =
         total_time_curr_level / num_queries_in_priority_level;
@@ -195,4 +199,19 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
   }
 }
 
+void Learner::initializeQuery(const QueryHandle &query_handle) {
+  const std::size_t priority_level = query_handle.query_priority();
+  const std::size_t query_id = query_handle.query_id();
+  DCHECK(isPriorityLevelPresent(priority_level));
+  query_id_to_priority_lookup_[query_id] = priority_level;
+  // TODO(harshad) - Create a gflag for max_past_entries_learner.
+  execution_stats_[priority_level].emplace_back(
+      query_id,
+      std::unique_ptr<ExecutionStats>(
+          new ExecutionStats(FLAGS_max_past_entries_learner)));
+  // As we are initializing the query, we obviously haven't gotten any
+  // feedback message for this query. Hence mark the following field as false.
+  has_feedback_from_all_queries_[priority_level] = false;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/78830e53/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index fb0e4cb..073b693 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -30,15 +30,10 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
-#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 namespace quickstep {
 
-/*DECLARE_int32(max_past_entries_learner,
-              10,
-              "The maximum number of past WorkOrder execution statistics"
-              " entries for a query");*/
 /** \addtogroup QueryExecution
  *  @{
  */
@@ -94,14 +89,6 @@ class Learner {
     }
   }
 
-  void updateProbabilitiesForQueriesInPriorityLevel(
-      const std::size_t priority_level, const std::size_t query_id);
-
-  // TODO(harshad) - Cache internal results from previous invocation of this
-  // function and reuse them. There's a lot of redundancy in computations
-  // at this point.
-  void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
-
   inline const bool hasActiveQueries() const {
     return !query_id_to_priority_lookup_.empty();
   }
@@ -122,6 +109,29 @@ class Learner {
 
  private:
   /**
+   * @brief Update the probabilities for queries in the given priority level.
+   *
+   * @note This function is called after the learner receives a completion
+   *       feedback message from a given query.
+   *
+   * @param priority_level The priority level.
+   * @param query_id The ID of the query for which a completion feedback message
+   *        has been received.
+   *
+   **/
+  void updateProbabilitiesForQueriesInPriorityLevel(
+      const std::size_t priority_level, const std::size_t query_id);
+
+  /**
+   * @brief Update the probabilities of all the priority levels.
+   *
+   * TODO(harshad) - Cache internal results from previous invocation of this
+   * function and reuse them. There's a lot of redundancy in computations
+   * at this point.
+   **/
+  void updateProbabilitiesOfAllPriorityLevels();
+
+  /**
    * @brief Initialize the default probabilities for the queries.
    **/
   void initializeDefaultProbabilitiesForAllQueries();
@@ -135,18 +145,14 @@ class Learner {
    * @brief Initialize the data structures for a given priority level, if none
    *        exist. If there are already data structures for the given priority
    *        level, do nothing.
+   *
+   * @note This function should be followed by a relearn() call, to insert this
+   *       priority levels in probabilities_of_priority_levels_.
    **/
   inline void initializePriorityLevelIfNotPresent(
       const std::size_t priority_level) {
     if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
-      // Calculate the default probability for the priority level here and use
-      // it instead of 0.5 here.
-      // TODO(harshad) - Correct this.
-      /*const float new_denominator =
-          probabilities_of_priority_levels_[priority_level]->getDenominator();
-      probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
-          priority_level, priority_level, new_denominator);*/
       execution_stats_[priority_level];
     }
   }
@@ -169,6 +175,10 @@ class Learner {
 
   /**
    * @brief Check if the Learner has presence of the given priority level.
+   *
+   * @param priority_level The priority level.
+   *
+   * @return True if present, false otherwise.
    **/
   inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
     DCHECK_EQ((current_probabilities_.find(priority_level) ==
@@ -178,7 +188,7 @@ class Learner {
   }
 
   /**
-   * @brief Check if the query is present.
+   * @brief Check if the query is present in local data structures.
    **/
   inline bool isQueryPresent(const std::size_t query_id) const {
     return query_id_to_priority_lookup_.find(query_id) !=
@@ -190,20 +200,7 @@ class Learner {
    *
    * @param query_handle The query handle for the new query.
    **/
-  void initializeQuery(const QueryHandle &query_handle) {
-    const std::size_t priority_level = query_handle.query_priority();
-    const std::size_t query_id = query_handle.query_id();
-    DCHECK(isPriorityLevelPresent(priority_level));
-    query_id_to_priority_lookup_[query_id] = priority_level;
-    execution_stats_[priority_level].emplace_back(
-        query_id,
-        std::unique_ptr<ExecutionStats>(
-            // new ExecutionStats(FLAGS_max_past_entries_learner)));
-            new ExecutionStats(10)));
-    // As we are initializing the query, we obviously haven't gotten any
-    // feedback message for this query. Hence mark the following field as false.
-    has_feedback_from_all_queries_[priority_level] = false;
-  }
+  void initializeQuery(const QueryHandle &query_handle);
 
   /**
    * @brief Get the execution stats object for the given query.
@@ -222,9 +219,10 @@ class Learner {
   }
 
   /**
-   * @brief This function works well when the query and priority level exists
-   *        in the data structures.
+   * @breif Get a mutable iterator to the execution stats for a given query.
    *
+   * @note This function works well when the query and priority level exists
+   *       in the data structures.
    **/
   inline std::vector<
       std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
@@ -244,6 +242,9 @@ class Learner {
     return stats_iter;
   }
 
+  /**
+   * @brief Get a query's priority level given its ID.
+   **/
   inline const std::size_t getQueryPriority(const std::size_t query_id) const {
     const auto it = query_id_to_priority_lookup_.find(query_id);
     DCHECK(it != query_id_to_priority_lookup_.end());
@@ -366,7 +367,6 @@ class Learner {
 
   // Key = priority level. Value = A boolean that indicates if we have received
   // feedback from all the queries in the given priority level.
-  // TODO(harshad) - Invalidate the cache whenever needed.
   std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
   DISALLOW_COPY_AND_ASSIGN(Learner);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/78830e53/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 864bb22..a1a144d 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -27,28 +27,29 @@ namespace quickstep {
 TEST(LearnerTest, AddAndRemoveQueryTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle;
-  const std::size_t kPriorityLevel1 = 1;
-  handle.reset(new QueryHandle(1, kPriorityLevel1));
+  const std::size_t kPriorityLevel = 1;
+  handle.reset(new QueryHandle(1, kPriorityLevel));
 
   EXPECT_FALSE(learner.hasActiveQueries());
   learner.addQuery(*handle);
   EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_TRUE(learner.hasActiveQueries());
+
   learner.removeQuery(handle->query_id());
   EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_FALSE(learner.hasActiveQueries());
 
-  const std::size_t kPriorityLevel2 = 1;
-  handle.reset(new QueryHandle(1, kPriorityLevel2));
+  handle.reset(new QueryHandle(2, kPriorityLevel));
   learner.addQuery(*handle);
   EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_TRUE(learner.hasActiveQueries());
+
   learner.removeQuery(handle->query_id());
   EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_FALSE(learner.hasActiveQueries());
 }
 


[02/21] incubator-quickstep git commit: API to find the highest priority level in the learner

Posted by hb...@apache.org.
API to find the highest priority level in the learner

- Unit tests to test the feature.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/996ca75b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/996ca75b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/996ca75b

Branch: refs/heads/scheduler++
Commit: 996ca75b4368ee0fc338af5707d52bfd62ef7389
Parents: 2d93312
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 14:45:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             | 12 +++--
 query_execution/Learner.cpp                | 33 +++++++++++++
 query_execution/Learner.hpp                | 30 +++++++-----
 query_execution/ProbabilityStore.hpp       | 10 ++--
 query_execution/QueryExecutionTypedefs.hpp |  2 +
 query_execution/tests/Learner_unittest.cpp | 64 +++++++++++++++++++++++++
 6 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ef1ce99..4639617 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -76,7 +76,7 @@ target_link_libraries(quickstep_queryexecution_ExecutionStats
                       glog
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_Foreman
-                      ${GFLAGS_LIB_NAME} 
+                      ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanLite
@@ -99,7 +99,8 @@ target_link_libraries(quickstep_queryexecution_Learner
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_ProbabilityStore
-                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
@@ -274,9 +275,10 @@ target_link_libraries(Learner_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_Learner
-                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryoptimizer_QueryHandle)
-add_test(Learner_unittest Learner_unittest) 
+add_test(Learner_unittest Learner_unittest)
 
 add_executable(ProbabilityStore_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
@@ -284,7 +286,7 @@ target_link_libraries(ProbabilityStore_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_ProbabilityStore)
-add_test(ProbabilityStore_unittest ProbabilityStore_unittest) 
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
 
 add_executable(QueryManager_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index c7a7064..720df33 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,6 +26,7 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
@@ -39,6 +40,11 @@ DEFINE_uint64(max_past_entries_learner,
               "The maximum number of past WorkOrder execution statistics"
               " entries for a query");
 
+Learner::Learner()
+    : highest_priority_level_(kInvalidPriorityLevel) {
+  probabilities_of_priority_levels_.reset(new ProbabilityStore());
+}
+
 void Learner::addCompletionFeedback(
     const serialization::NormalWorkOrderCompletionMessage
         &workorder_completion_proto) {
@@ -214,4 +220,31 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
   has_feedback_from_all_queries_[priority_level] = false;
 }
 
+void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
+  DCHECK(isPriorityLevelPresent(priority_level));
+  if (execution_stats_[priority_level].empty()) {
+    execution_stats_.erase(priority_level);
+    current_probabilities_.erase(priority_level);
+    probabilities_of_priority_levels_->removeObject(priority_level);
+    has_feedback_from_all_queries_.erase(priority_level);
+    if (hasActiveQueries()) {
+      if (static_cast<int>(priority_level) == highest_priority_level_) {
+        // The priority level to be removed is the highest priority level.
+        std::size_t new_highest_priority_level = 0;
+        // Find the new highest priority level.
+        for (auto priority_level_it = execution_stats_.cbegin();
+             priority_level_it != execution_stats_.cend();
+             ++priority_level_it) {
+          if (priority_level_it->first > new_highest_priority_level) {
+            new_highest_priority_level = priority_level_it->first;
+          }
+        }
+        highest_priority_level_ = static_cast<int>(new_highest_priority_level);
+      }
+    } else {
+      highest_priority_level_ = kInvalidPriorityLevel;
+    }
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 073b693..f99b1c6 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -43,9 +43,7 @@ class Learner {
   /**
    * @brief Constructor.
    **/
-  Learner() {
-    probabilities_of_priority_levels_.reset(new ProbabilityStore());
-  }
+  Learner();
 
   void addCompletionFeedback(
       const serialization::NormalWorkOrderCompletionMessage
@@ -107,6 +105,16 @@ class Learner {
     return query_id_to_priority_lookup_.size();
   }
 
+  /**
+   * @brief Get the highest priority level among the active queries.
+   *
+   * @return The highest priority level. If the system is empty it returns
+   *         kInvalidPriorityLevel.
+   **/
+  inline const int getHighestPriorityLevel() const {
+    return highest_priority_level_;
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -151,9 +159,13 @@ class Learner {
    **/
   inline void initializePriorityLevelIfNotPresent(
       const std::size_t priority_level) {
+    CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
     if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
       execution_stats_[priority_level];
+      if (static_cast<int>(priority_level) > highest_priority_level_) {
+        highest_priority_level_ = priority_level;
+      }
     }
   }
 
@@ -163,15 +175,7 @@ class Learner {
    *
    * @param priority_level The priority level.
    **/
-  inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
-    DCHECK(isPriorityLevelPresent(priority_level));
-    if (execution_stats_[priority_level].empty()) {
-      execution_stats_.erase(priority_level);
-      current_probabilities_.erase(priority_level);
-      probabilities_of_priority_levels_->removeObject(priority_level);
-      has_feedback_from_all_queries_.erase(priority_level);
-    }
-  }
+  void checkAndRemovePriorityLevel(const std::size_t priority_level);
 
   /**
    * @brief Check if the Learner has presence of the given priority level.
@@ -369,6 +373,8 @@ class Learner {
   // feedback from all the queries in the given priority level.
   std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
+  int highest_priority_level_;
+
   DISALLOW_COPY_AND_ASSIGN(Learner);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 347df89..233dd2e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -212,9 +212,10 @@ class ProbabilityStore {
                                              cumulative_probability);
       cumulative_probability += p.second.second;
     }
-    // Adjust the last cumulative probability manually to 1.0, so that
+    DCHECK(!cumulative_probabilities_.empty());
+    // Adjust the last cumulative probability manually to 1, so that
     // floating addition related rounding issues are ignored.
-    cumulative_probabilities_.back().updateProbability(1.0);
+    cumulative_probabilities_.back().updateProbability(1);
   }
 
   /**
@@ -233,7 +234,9 @@ class ProbabilityStore {
    public:
     ProbabilityInfo(const std::size_t property, const float probability)
         : property_(property), probability_(probability) {
-      DCHECK_LE(probability, 1.0);
+      // As GLOG doesn't provide DEBUG only checks for less than equal
+      // comparison for floats, we can't ensure that probability is less than
+      // 1.0.
     }
 
     ProbabilityInfo(const ProbabilityInfo &other) = default;
@@ -241,7 +244,6 @@ class ProbabilityStore {
     ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
 
     void updateProbability(const float new_probability) {
-      DCHECK_LE(new_probability, 1.0);
       probability_ = new_probability;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..e13f3e0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -43,6 +43,8 @@ typedef tmb::TaggedMessage TaggedMessage;
 typedef tmb::client_id client_id;
 typedef tmb::message_type_id message_type_id;
 
+const int kInvalidPriorityLevel = -1;
+
 using ClientIDMap = ThreadIDBasedMap<client_id,
                                      'C',
                                      'l',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 556c984..107576f 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -15,12 +15,18 @@
  *   limitations under the License.
  **/
 
+#include <algorithm>
+#include <chrono>
+#include <cstddef>
 #include <memory>
+#include <random>
+#include <vector>
 
 #include "gtest/gtest.h"
 
 #include "query_execution/Learner.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 
 namespace quickstep {
@@ -199,4 +205,62 @@ TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
     }
   }
 }
+
+TEST_F(LearnerTest, HighestPriorityLevelTest) {
+  std::vector<std::size_t> priorities_insertion_order;
+  std::vector<std::size_t> priorities_removal_order;
+  const std::size_t kNumPrioritiesToTest = 20;
+  for (std::size_t priority_num = 1;
+       priority_num <= kNumPrioritiesToTest;
+       ++priority_num) {
+    // Note: Priority level should be non-zero, hence we begin from 1.
+    priorities_insertion_order.emplace_back(priority_num);
+    priorities_removal_order.emplace_back(priority_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g(rd());
+
+  std::shuffle(priorities_insertion_order.begin(),
+               priorities_insertion_order.end(),
+               g);
+
+  std::shuffle(priorities_removal_order.begin(),
+               priorities_removal_order.end(),
+               g);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+
+  std::unique_ptr<QueryHandle> handle;
+  // First insert the queries in the order of priorities as defined by
+  // priorities_insertion_order.
+  for (auto it = priorities_insertion_order.begin();
+       it != priorities_insertion_order.end();
+       ++it) {
+    // Note that the query ID is kept the same as priority level for simplicity.
+    handle.reset(new QueryHandle(*it, *it));
+    learner.addQuery(*handle);
+    const std::size_t max_priority_so_far =
+        *(std::max_element(priorities_insertion_order.begin(), it + 1));
+    EXPECT_EQ(static_cast<int>(max_priority_so_far),
+              learner.getHighestPriorityLevel());
+  }
+  // Now remove the queries in the order of priorities as defined by
+  // priorities_removal_order.
+  for (auto it = priorities_removal_order.begin();
+       it != priorities_removal_order.end();
+       ++it) {
+    // Recall that the query ID is the same as priority level.
+    const std::size_t max_priority_so_far =
+        *(std::max_element(it, priorities_removal_order.end()));
+    EXPECT_EQ(static_cast<int>(max_priority_so_far),
+              learner.getHighestPriorityLevel());
+    learner.removeQuery(*it);
+  }
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+}
+
 }  // namespace quickstep


[14/21] incubator-quickstep git commit: Created unit test for Learner

Posted by hb...@apache.org.
Created unit test for Learner

- API changes for probability store.
- Check if there's a probability entry for the query to be removed.
- Bug fix in remove query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/75876845
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/75876845
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/75876845

Branch: refs/heads/scheduler++
Commit: 758768452a044940bb00b8a721380f3fa5aedb47
Parents: fe319ae
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 23:03:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             |  9 ++++
 query_execution/Learner.cpp                |  6 ++-
 query_execution/Learner.hpp                | 33 ++++++++++-----
 query_execution/ProbabilityStore.hpp       |  5 +++
 query_execution/tests/Learner_unittest.cpp | 55 +++++++++++++++++++++++++
 5 files changed, 96 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75876845/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index cb0f815..3904185 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -268,6 +268,15 @@ if (ENABLE_DISTRIBUTED)
   add_test(BlockLocator_unittest BlockLocator_unittest)
 endif()
 
+add_executable(Learner_unittest
+  "${CMAKE_CURRENT_SOURCE_DIR}/tests/Learner_unittest.cpp")
+target_link_libraries(Learner_unittest
+                      gtest
+                      gtest_main
+                      quickstep_queryexecution_Learner
+                      quickstep_queryoptimizer_QueryHandle)
+add_test(Learner_unittest Learner_unittest) 
+
 add_executable(ProbabilityStore_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
 target_link_libraries(ProbabilityStore_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75876845/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 72c68f0..5d877b4 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -188,8 +188,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
     priority_levels.emplace_back(priority_iter->first);
     numerators.emplace_back(priority_iter->first);
   }
-  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
-      priority_levels, numerators, sum_priority_levels);
+  if (sum_priority_levels > 0) {
+    probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+        priority_levels, numerators, sum_priority_levels);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75876845/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 64120a7..9d51877 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -35,10 +35,10 @@
 
 namespace quickstep {
 
-DEFINE_int32(max_past_entries_learner,
+/*DECLARE_int32(max_past_entries_learner,
               10,
               "The maximum number of past WorkOrder execution statistics"
-              " entries for a query");
+              " entries for a query");*/
 /** \addtogroup QueryExecution
  *  @{
  */
@@ -49,6 +49,7 @@ class Learner {
    * @brief Constructor.
    **/
   Learner() {
+    probabilities_of_priority_levels_.reset(new ProbabilityStore());
   }
 
   void addCompletionFeedback(
@@ -67,7 +68,15 @@ class Learner {
     const std::size_t priority_level = getQueryPriority(query_id);
     auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
     execution_stats_[priority_level].erase(stats_iter_mutable);
-    current_probabilities_[priority_level]->removeObject(query_id);
+    DCHECK(current_probabilities_.find(priority_level) !=
+           current_probabilities_.end());
+    if (current_probabilities_[priority_level]->hasObject(query_id)) {
+      // We may have cases when a query doesn't produce any feedback message,
+      // therefore we may not have an entry for this query in the
+      // current_probabilities_[priority_level] ProbabilityStore.
+      current_probabilities_[priority_level]->removeObject(query_id);
+    }
+    query_id_to_priority_lookup_.erase(query_id);
     checkAndRemovePriorityLevel(priority_level);
     relearn();
   }
@@ -93,6 +102,10 @@ class Learner {
   // at this point.
   void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
 
+  inline const std::size_t hasActiveQueries() const {
+    return !query_id_to_priority_lookup_.empty();
+  }
+
  private:
   /**
    * @brief Initialize the default probabilities for the queries.
@@ -111,12 +124,15 @@ class Learner {
    **/
   inline void initializePriorityLevelIfNotPresent(
       const std::size_t priority_level) {
-    if (isPriorityLevelPresent(priority_level)) {
+    if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
       // Calculate the default probability for the priority level here and use
       // it instead of 0.5 here.
       // TODO(harshad) - Correct this.
-      probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+      /*const float new_denominator =
+          probabilities_of_priority_levels_[priority_level]->getDenominator();
+      probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
+          priority_level, priority_level, new_denominator);*/
       execution_stats_[priority_level];
     }
   }
@@ -168,7 +184,8 @@ class Learner {
     execution_stats_[priority_level].emplace_back(
         query_id,
         std::unique_ptr<ExecutionStats>(
-            new ExecutionStats(FLAGS_max_past_entries_learner)));
+            // new ExecutionStats(FLAGS_max_past_entries_learner)));
+            new ExecutionStats(10)));
     // As we are initializing the query, we obviously haven't gotten any
     // feedback message for this query. Hence mark the following field as false.
     has_feedback_from_all_queries_[priority_level] = false;
@@ -251,10 +268,6 @@ class Learner {
     has_feedback_from_all_queries_[priority_level] = true;
   }
 
-  inline const std::size_t hasActiveQueries() const {
-    return !query_id_to_priority_lookup_.empty();
-  }
-
   /**
    * @brief Get the mean work order execution times for all the queries in
    *        a given priority level.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75876845/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index d31caa6..347df89 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -55,6 +55,11 @@ class ProbabilityStore {
     return common_denominator_;
   }
 
+  inline bool hasObject(const std::size_t property) const {
+    auto it = individual_probabilities_.find(property);
+    return (it != individual_probabilities_.end());
+  }
+
   /**
    * @brief Add individual (not cumulative) probability for a given object.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75876845/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
new file mode 100644
index 0000000..cab241a
--- /dev/null
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -0,0 +1,55 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <memory>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/Learner.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+namespace quickstep {
+
+TEST(LearnerTest, AddQueryTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle;
+  handle.reset(new QueryHandle(1, 1));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  learner.addQuery(*handle);
+  EXPECT_TRUE(learner.hasActiveQueries());
+}
+
+TEST(LearnerTest, RemoveQueryTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle;
+  handle.reset(new QueryHandle(1, 1));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  learner.addQuery(*handle);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_FALSE(learner.hasActiveQueries());
+
+  handle.reset(new QueryHandle(2, 1));
+  learner.addQuery(*handle);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_FALSE(learner.hasActiveQueries());
+}
+
+}  // namespace quickstep


[12/21] incubator-quickstep git commit: Get number of active queries (total and by priority level)

Posted by hb...@apache.org.
Get number of active queries (total and by priority level)

- Unit tests to check the feature.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/083d17d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/083d17d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/083d17d6

Branch: refs/heads/scheduler++
Commit: 083d17d686f5c49ebfa09d74fdf6f0d7f27c89c4
Parents: 7587684
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:42:56 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.hpp                | 16 +++++++-
 query_execution/tests/Learner_unittest.cpp | 54 ++++++++++++++++++++-----
 2 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/083d17d6/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 9d51877..fb0e4cb 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -102,10 +102,24 @@ class Learner {
   // at this point.
   void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
 
-  inline const std::size_t hasActiveQueries() const {
+  inline const bool hasActiveQueries() const {
     return !query_id_to_priority_lookup_.empty();
   }
 
+  inline const std::size_t getNumActiveQueriesInPriorityLevel(
+      const std::size_t priority_level) const {
+    const auto it = execution_stats_.find(priority_level);
+    if (it != execution_stats_.end()) {
+      return it->second.size();
+    } else {
+      return 0;
+    }
+  }
+
+  inline const std::size_t getTotalNumActiveQueries() const {
+    return query_id_to_priority_lookup_.size();
+  }
+
  private:
   /**
    * @brief Initialize the default probabilities for the queries.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/083d17d6/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index cab241a..74353f0 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -24,32 +24,64 @@
 
 namespace quickstep {
 
-TEST(LearnerTest, AddQueryTest) {
+TEST(LearnerTest, AddAndRemoveQueryTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle;
-  handle.reset(new QueryHandle(1, 1));
+  const std::size_t kPriorityLevel1 = 1;
+  handle.reset(new QueryHandle(1, kPriorityLevel1));
 
   EXPECT_FALSE(learner.hasActiveQueries());
   learner.addQuery(*handle);
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
   EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_FALSE(learner.hasActiveQueries());
+
+  const std::size_t kPriorityLevel2 = 1;
+  handle.reset(new QueryHandle(1, kPriorityLevel2));
+  learner.addQuery(*handle);
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_FALSE(learner.hasActiveQueries());
 }
 
-TEST(LearnerTest, RemoveQueryTest) {
+TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   Learner learner;
-  std::unique_ptr<QueryHandle> handle;
-  handle.reset(new QueryHandle(1, 1));
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel = 1;
+  handle1.reset(new QueryHandle(1, kPriorityLevel));
+  handle2.reset(new QueryHandle(2, kPriorityLevel));
 
   EXPECT_FALSE(learner.hasActiveQueries());
-  learner.addQuery(*handle);
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+  learner.addQuery(*handle1);
   EXPECT_TRUE(learner.hasActiveQueries());
-  learner.removeQuery(handle->query_id());
-  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+  learner.addQuery(*handle2);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 
-  handle.reset(new QueryHandle(2, 1));
-  learner.addQuery(*handle);
+  learner.removeQuery(handle1->query_id());
   EXPECT_TRUE(learner.hasActiveQueries());
-  learner.removeQuery(handle->query_id());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+  learner.removeQuery(handle2->query_id());
+
   EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 }
 
 }  // namespace quickstep


[05/21] incubator-quickstep git commit: Added ExecutionStats class

Posted by hb...@apache.org.
Added ExecutionStats class

- To keep track of query execution statistics for a given query.
- The stats class organizes execution time on a per-operator basis.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/be92212f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/be92212f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/be92212f

Branch: refs/heads/scheduler++
Commit: be92212ff45c77d0f19c6b2e926fc3dc6a561f0f
Parents: 31f1bbb
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jun 19 10:06:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt     |   6 ++
 query_execution/ExecutionStats.hpp | 177 ++++++++++++++++++++++++++++++++
 query_execution/PolicyEnforcer.hpp |   2 +-
 3 files changed, 184 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/be92212f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..fcd4f48 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -32,6 +32,7 @@ if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif()
 add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
+add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -69,6 +70,9 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_utility_Macros
                         tmb)
 endif()
+target_link_libraries(quickstep_queryexecution_ExecutionStats
+                      glog
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_Foreman
                       ${GFLAGS_LIB_NAME} 
                       glog
@@ -91,6 +95,7 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
+                      quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -199,6 +204,7 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
 add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
 target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_AdmitRequestMessage
+                      quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_ForemanLite
                       quickstep_queryexecution_PolicyEnforcer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/be92212f/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
new file mode 100644
index 0000000..f28f367
--- /dev/null
+++ b/query_execution/ExecutionStats.hpp
@@ -0,0 +1,177 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief Record the execution stats of a query.
+ **/
+class ExecutionStats {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param max_entries The maximum number of entries we remember for each
+   *        operator.
+   **/
+  explicit ExecutionStats(const std::size_t max_entries)
+      : max_entries_(max_entries), cached_stats_(std::make_pair(0, 0)) {}
+
+  /**
+   * @brief Get the number of active operators in stats.
+   **/
+  const std::size_t getNumActiveOperators() const {
+    return active_operators_.size();
+  }
+
+  /**
+   * @brief Get the current stats.
+   *
+   * @note This function updates the cache, hence it can't be const. We are lazy
+   *       in updating the cache, instead of eagerly updating the cache upon
+   *       each update.
+   *
+   * @return A pair - 1st element is total time, 2nd element is total number of
+   *         WorkOrders for the whole query.
+   **/
+  std::pair<std::uint64_t, std::uint64_t> getCurrentStats() {
+    if (active_operators_.empty()) {
+      return cached_stats_;
+    } else {
+      std::pair<std::uint64_t, std::uint64_t> result = std::make_pair(0, 0);
+      for (auto it = active_operators_.begin(); it != active_operators_.end(); ++it) {
+        DCHECK(it->second.get() != nullptr);
+        std::pair<std::uint64_t, std::size_t> op_stats = it->second->getStats();
+        result.first += op_stats.first;
+        result.second += op_stats.second;
+      }
+      if (result.first == 0 || result.second == 0) {
+        // If one of the element in the pair is 0, use old result.
+        return cached_stats_;
+      } else if (result.first != 0 && result.second != 0) {
+        cached_stats_ = result;
+      }
+      return result;
+    }
+  }
+
+  /**
+   * @brief Add a new entry to stats.
+   *
+   * @param value The value to be added.
+   * @param operator_index The operator index which the value belongs to.
+   **/
+  void addEntry(std::size_t value, std::size_t operator_index) {
+    if (hasOperator(operator_index)) {
+      // This is not the first entry for the given operator.
+      active_operators_[operator_index]->addEntry(value);
+    } else {
+      // Create the OperatorStats object for this operator.
+      active_operators_[operator_index] =
+          std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
+    }
+  }
+
+  /**
+   * @brief Remove the operator with given index. This should be called only
+   *        when the given operator finishes its execution.
+   **/
+  void removeOperator(std::size_t operator_index) {
+    DCHECK(hasOperator(operator_index));
+    active_operators_.erase(operator_index);
+  }
+
+ private:
+  /**
+   * @brief Stats for an operator within the query.
+   *
+   * @note We remember only the last N entries for the operator.
+   **/
+  class OperatorStats {
+   public:
+    /**
+     * @brief Constructor.
+     *
+     * @param max_entries The maximum number of entries we remember. Typically
+     *        these are the last N (=max_entries) entries.
+     **/
+    explicit OperatorStats(const std::size_t max_entries) : max_entries_(max_entries) {
+      DCHECK_GE(max_entries, 0);
+    }
+
+    inline std::pair<std::uint64_t, std::size_t> getStats() const {
+      return std::make_pair(std::accumulate(times_.begin(), times_.end(), 0),
+                            times_.size());
+    }
+
+    inline void addEntry(std::uint64_t time_value) {
+      if (times_.size() == max_entries_) {
+        times_.pop_front();
+      }
+      times_.push_back(time_value);
+      DCHECK_LE(times_.size(), max_entries_);
+    }
+
+   private:
+    const std::size_t max_entries_;
+    std::deque<std::uint64_t> times_;
+
+    DISALLOW_COPY_AND_ASSIGN(OperatorStats);
+  };
+
+  /**
+   * @brief Check if the operator with given index is present in the stats.
+   **/
+  inline bool hasOperator(const std::size_t operator_index) const {
+    return active_operators_.find(operator_index) != active_operators_.end();
+  }
+
+  const std::size_t max_entries_;
+
+  std::unordered_map<std::size_t, std::unique_ptr<OperatorStats>>
+      active_operators_;
+
+  // Cached stats for the whole query.
+  std::pair<std::uint64_t, std::uint64_t> cached_stats_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExecutionStats);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/be92212f/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 470ff2a..b7f5735 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -214,4 +214,4 @@ class PolicyEnforcer {
 
 }  // namespace quickstep
 
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_


[11/21] incubator-quickstep git commit: Bug fixed in probability store.

Posted by hb...@apache.org.
Bug fixed in probability store.

- Correct calculation for cumulative probabilities after removing an
  object.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c64bf757
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c64bf757
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c64bf757

Branch: refs/heads/scheduler++
Commit: c64bf7574d82b774429eb670b3769337ad9f6d86
Parents: c17529a
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jun 26 09:35:11 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/ProbabilityStore.hpp              |  4 ++--
 .../tests/ProbabilityStore_unittest.cpp           | 18 +++++++++---------
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c64bf757/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index ed52f75..7278e2b 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -173,7 +173,7 @@ class ProbabilityStore {
       }
       CHECK_GT(new_denominator, 0);
       common_denominator_ = new_denominator;
-      updateCumulativeProbabilities();
+      updateProbabilitiesNewDenominator();
     } else {
       // In order to keep the store consistent, we should keep the sizes of
       // individual_probabilities_ and cumulative_probabilities_ the same.
@@ -208,9 +208,9 @@ class ProbabilityStore {
     }
     float cumulative_probability = 0;
     for (const auto p : individual_probabilities_) {
+      cumulative_probability += p.second.second;
       cumulative_probabilities_.emplace_back(p.first,
                                              cumulative_probability);
-      cumulative_probability += p.second.second;
     }
     DCHECK(!cumulative_probabilities_.empty());
     // Adjust the last cumulative probability manually to 1, so that

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c64bf757/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index dcec1e5..518699e 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -34,8 +34,8 @@ TEST(ProbabilityStoreTest, CountTest) {
   EXPECT_EQ(0u, store.getNumObjects());
 
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> numerators {1, 2, 3, 5};
-  const std::size_t kNewDenominator = 10;
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
   store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   EXPECT_EQ(objects.size(), store.getNumObjects());
@@ -44,8 +44,8 @@ TEST(ProbabilityStoreTest, CountTest) {
 TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> numerators {1, 2, 3, 5};
-  const std::size_t kNewDenominator = 10;
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
   store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
@@ -57,8 +57,8 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
 TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> numerators {1, 2, 3, 5};
-  const std::size_t kNewDenominator = 10;
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
   store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   const std::size_t kNumTrials = 10;
@@ -78,8 +78,8 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
 TEST(ProbabilityStoreTest, RemoveObjectTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> numerators {1, 2, 3, 5};
-  const std::size_t kNewDenominator = 10;
+  std::vector<float> numerators {1, 2, 2, 5};
+  const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
   store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
@@ -96,7 +96,7 @@ TEST(ProbabilityStoreTest, RemoveObjectTest) {
 
   EXPECT_EQ(expected_new_denominator, store.getDenominator());
   for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
-    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(expected_new_denominator),
               store.getIndividualProbability(objects[object_num]));
   }
 }


[07/21] incubator-quickstep git commit: Added test for adding completion message feedback.

Posted by hb...@apache.org.
Added test for adding completion message feedback.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2d933121
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2d933121
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2d933121

Branch: refs/heads/scheduler++
Commit: 2d9331214cbedef428a131c6ec95325257ddcaca
Parents: 78830e5
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 12:09:22 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             |  1 +
 query_execution/Learner.cpp                |  4 +-
 query_execution/tests/Learner_unittest.cpp | 82 ++++++++++++++++++++++++-
 3 files changed, 82 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d933121/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 3904185..ef1ce99 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -274,6 +274,7 @@ target_link_libraries(Learner_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_Learner
+                      quickstep_queryexecution_QueryExecutionMessages_proto                      
                       quickstep_queryoptimizer_QueryHandle)
 add_test(Learner_unittest Learner_unittest) 
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d933121/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 0f17e7a..c7a7064 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -51,10 +51,11 @@ void Learner::addCompletionFeedback(
       workorder_completion_proto.execution_time_in_microseconds(),
       workorder_completion_proto.operator_index());
 
-  // updateProbability();
   if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
     updateFeedbackFromQueriesInPriorityLevel(priority_level);
   }
+  updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
+  updateProbabilitiesOfAllPriorityLevels();
 }
 
 void Learner::updateProbabilitiesForQueriesInPriorityLevel(
@@ -67,7 +68,6 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     return;
   } else if (execution_stats_[priority_level].size() == 1u) {
     DCHECK(current_probabilities_[priority_level] != nullptr);
-    DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
     // As we want the probability of the lone query in this priority level as
     // 1, we set the numerator same as denominator.
     const std::size_t numerator =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d933121/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index a1a144d..556c984 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -20,11 +20,26 @@
 #include "gtest/gtest.h"
 
 #include "query_execution/Learner.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_optimizer/QueryHandle.hpp"
 
 namespace quickstep {
 
-TEST(LearnerTest, AddAndRemoveQueryTest) {
+class LearnerTest : public ::testing::Test {
+ protected:
+  serialization::NormalWorkOrderCompletionMessage createMockCompletionMessage(
+      const std::size_t query_id, const std::size_t operator_id) {
+    serialization::NormalWorkOrderCompletionMessage mock_proto_message;
+    mock_proto_message.set_operator_index(operator_id);
+    mock_proto_message.set_query_id(query_id);
+    mock_proto_message.set_worker_thread_index(0);
+    mock_proto_message.set_execution_time_in_microseconds(10);
+
+    return mock_proto_message;
+  }
+};
+
+TEST_F(LearnerTest, AddAndRemoveQueryTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle;
   const std::size_t kPriorityLevel = 1;
@@ -53,7 +68,7 @@ TEST(LearnerTest, AddAndRemoveQueryTest) {
   EXPECT_FALSE(learner.hasActiveQueries());
 }
 
-TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle1, handle2;
   const std::size_t kPriorityLevel = 1;
@@ -85,7 +100,7 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 }
 
-TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle1, handle2;
   const std::size_t kPriorityLevel1 = 1;
@@ -123,4 +138,65 @@ TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
   EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
 }
 
+TEST_F(LearnerTest, AddCompletionFeedbackSamePriorityLevelTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel = 1;
+  handle1.reset(new QueryHandle(1, kPriorityLevel));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  learner.addQuery(*handle1);
+  serialization::NormalWorkOrderCompletionMessage completion_message =
+      createMockCompletionMessage(handle1->query_id(), 0);
+
+  learner.addCompletionFeedback(completion_message);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+  handle2.reset(new QueryHandle(2, kPriorityLevel));
+  learner.addQuery(*handle2);
+  completion_message = createMockCompletionMessage(handle2->query_id(), 0);
+  learner.addCompletionFeedback(completion_message);
+
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+}
+
+TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel1 = 1;
+  const std::size_t kPriorityLevel2 = 2;
+  handle1.reset(new QueryHandle(1, kPriorityLevel1));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  learner.addQuery(*handle1);
+
+  handle2.reset(new QueryHandle(2, kPriorityLevel2));
+  learner.addQuery(*handle2);
+
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  const std::size_t kNumIterations = 10;
+  std::vector<QueryHandle*> handles;
+  handles.emplace_back(handle1.get());
+  handles.emplace_back(handle2.get());
+  for (std::size_t iter_num = 0; iter_num < kNumIterations; ++iter_num) {
+    for (std::size_t index = 0; index < handles.size(); ++index) {
+      EXPECT_TRUE(learner.hasActiveQueries());
+      serialization::NormalWorkOrderCompletionMessage completion_message =
+        createMockCompletionMessage(handles[index]->query_id(), 0);
+      learner.addCompletionFeedback(completion_message);
+      EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+      EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+      EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+    }
+  }
+}
 }  // namespace quickstep


[19/21] incubator-quickstep git commit: More conditions and logging in PolicyEnforcer.

Posted by hb...@apache.org.
More conditions and logging in PolicyEnforcer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7074e439
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7074e439
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7074e439

Branch: refs/heads/scheduler++
Commit: 7074e4399be0e1fa75ee9947cc0f1e4ceda8957d
Parents: ca34819
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 14:35:53 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:11 2016 -0500

----------------------------------------------------------------------
 query_execution/PriorityPolicyEnforcer.cpp | 24 ++++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7074e439/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 6467367..f9a741d 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -174,18 +174,26 @@ void PriorityPolicyEnforcer::getWorkerMessages(
     // While there are more priority levels to be checked ..
     while (checked_priority_levels.size() < priority_query_ids_.size()) {
       const int chosen_priority_level = learner_->pickRandomPriorityLevel();
-      DCHECK(chosen_priority_level != kInvalidPriorityLevel);
-      WorkerMessage *next_worker_message =
-          getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
-                                                &finished_queries_ids);
-      if (next_worker_message != nullptr) {
-        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+      if (chosen_priority_level == kInvalidPriorityLevel) {
+        LOG(INFO) << "No valid priority level chosen";
+        break;
+      } else if (checked_priority_levels.find(static_cast<std::size_t>(
+                     chosen_priority_level)) != checked_priority_levels.end()) {
+        DLOG(INFO) << "The chosen priority level " << chosen_priority_level << " was checked already";
+        continue;
       } else {
-        checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
+        WorkerMessage *next_worker_message =
+            getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
+                                                  &finished_queries_ids);
+        if (next_worker_message != nullptr) {
+          worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+        } else {
+          checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
+        }
       }
     }
   } else {
-    LOG(INFO) << "No active queries in the learner at this point.";
+    DLOG(INFO) << "No active queries in the learner at this point.";
     return;
   }
   for (const std::size_t finished_qid : finished_queries_ids) {


[21/21] incubator-quickstep git commit: AdmitRequest message function accepts multiple queries.

Posted by hb...@apache.org.
AdmitRequest message function accepts multiple queries.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/fb05e6ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/fb05e6ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/fb05e6ec

Branch: refs/heads/scheduler++
Commit: fb05e6ecacdd80412201e048aa53fd26af19475c
Parents: 7074e43
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 15:10:51 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:11 2016 -0500

----------------------------------------------------------------------
 cli/CommandExecutor.cpp                                | 4 +++-
 cli/QuickstepCli.cpp                                   | 4 +++-
 query_execution/AdmitRequestMessage.hpp                | 5 +++--
 query_execution/QueryExecutionUtil.hpp                 | 4 ++--
 query_optimizer/tests/ExecutionGeneratorTestRunner.cpp | 4 +++-
 5 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fb05e6ec/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 7083ef5..440a30f 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -216,9 +216,11 @@ inline TypedValue executeQueryForSingleResult(
       query_processor->generateQueryHandle(*result.parsed_statement));
   DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 
+  std::vector<QueryHandle*> query_handles;
+  query_handles.push_back(query_handle.get());
   // Use foreman to execute the query plan.
   QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-      main_thread_client_id, foreman_client_id, query_handle.get(), bus);
+      main_thread_client_id, foreman_client_id, &query_handles, bus);
 
   QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id, bus);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fb05e6ec/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 02a55a0..c34e389 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -433,11 +433,13 @@ int main(int argc, char* argv[]) {
         }
 
         DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+        std::vector<QueryHandle*> query_handles;
+        query_handles.push_back(query_handle.get());
         start = std::chrono::steady_clock::now();
         QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
             main_thread_client_id,
             foreman.getBusClientID(),
-            query_handle.get(),
+            &query_handles,
             &bus);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fb05e6ec/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index e33b354..0aefcbf 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -41,8 +41,9 @@ class AdmitRequestMessage {
    * @param query_handles The handles of the queries requesting to be admitted
    *        to the system.
    **/
-  explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles)
-      : query_handles_(query_handles) {}
+  explicit AdmitRequestMessage(std::vector<QueryHandle*> *query_handles)
+      : query_handles_(*DCHECK_NOTNULL(query_handles)) {
+  }
 
   /**
    * @brief Constructor for requesting single query admission.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fb05e6ec/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 6ea4a29..2b95f1f 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -74,7 +74,7 @@ class QueryExecutionUtil {
    *
    * @param sender_id The TMB client ID of the sender.
    * @param receiver_id The TMB client ID of the receiver.
-   * @param query_handle The QueryHandle used in the AdmitRequestMessage.
+   * @param query_handle The QueryHandles used in the AdmitRequestMessage.
    * @param bus A pointer to the TMB.
    * @param tagged_message A moved from reference to the tagged message.
    *
@@ -84,7 +84,7 @@ class QueryExecutionUtil {
   static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
       const tmb::client_id sender_id,
       const tmb::client_id receiver_id,
-      QueryHandle *query_handle,
+      std::vector<QueryHandle*> *query_handle,
       MessageBus *bus) {
     std::unique_ptr<AdmitRequestMessage> request_message(
         new AdmitRequestMessage(query_handle));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fb05e6ec/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 8c1d306..92baf19 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -96,10 +96,12 @@ void ExecutionGeneratorTestRunner::runTestCase(
                 logical_generator.generatePlan(*result.parsed_statement));
         execution_generator.generatePlan(physical_plan);
 
+        std::vector<QueryHandle*> query_handles;
+        query_handles.push_back(&query_handle);
         QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
             main_thread_client_id_,
             foreman_->getBusClientID(),
-            &query_handle,
+            &query_handles,
             &bus_);
 
         QueryExecutionUtil::ReceiveQueryCompletionMessage(


[13/21] incubator-quickstep git commit: Created Learner class.

Posted by hb...@apache.org.
Created Learner class.

- Learner keeps track of statistics of concurrent queries
- It maintains the probabilities for individual queries as well as the
  priority levels in the system.
- Changes in ProbabilityStore class including addition of numerator,
  denominator and more unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/fe319aea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/fe319aea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/fe319aea

Branch: refs/heads/scheduler++
Commit: fe319aea74ae79c2c5c2d34f88e8391d1aad072d
Parents: 347d039
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 15:54:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  11 +
 query_execution/ExecutionStats.hpp              |  18 +
 query_execution/Learner.cpp                     | 195 ++++++++++
 query_execution/Learner.hpp                     | 352 +++++++++++++++++++
 query_execution/PolicyEnforcer.cpp              |   2 +
 query_execution/ProbabilityStore.hpp            | 148 ++++++--
 .../tests/ProbabilityStore_unittest.cpp         |  45 ++-
 7 files changed, 728 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fe319aea/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 18ae0da..cb0f815 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,7 @@ add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitR
 add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
 add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -93,11 +94,20 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_queryexecution_Learner
+                      ${GFLAGS_LIB_NAME}
+                      glog
+                      quickstep_queryexecution_ExecutionStats
+                      quickstep_queryexecution_ProbabilityStore
+                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_Learner
                       quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -212,6 +222,7 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_ForemanLite
+                      quickstep_queryexecution_Learner
                       quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fe319aea/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index f28f367..769c7a4 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,6 +58,20 @@ class ExecutionStats {
   }
 
   /**
+   * @brief Check if there are any stats present.
+   **/
+  inline bool hasStats() const {
+    for (auto it = active_operators_.begin();
+         it != active_operators_.end();
+         ++it) {
+      if (!it->second->hasStatsForOperator()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * @brief Get the current stats.
    *
    * @note This function updates the cache, hence it can't be const. We are lazy
@@ -145,6 +159,10 @@ class ExecutionStats {
       DCHECK_LE(times_.size(), max_entries_);
     }
 
+    inline bool hasStatsForOperator() const {
+      return !times_.empty();
+    }
+
    private:
     const std::size_t max_entries_;
     std::deque<std::uint64_t> times_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fe319aea/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
new file mode 100644
index 0000000..72c68f0
--- /dev/null
+++ b/query_execution/Learner.cpp
@@ -0,0 +1,195 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/Learner.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void Learner::addCompletionFeedback(
+    const serialization::NormalWorkOrderCompletionMessage
+        &workorder_completion_proto) {
+  const std::size_t query_id = workorder_completion_proto.query_id();
+  DCHECK(isQueryPresent(query_id));
+  const std::size_t priority_level = getQueryPriority(query_id);
+  ExecutionStats *execution_stats = getExecutionStats(query_id);
+  DCHECK(execution_stats != nullptr);
+  execution_stats->addEntry(
+      workorder_completion_proto.execution_time_in_microseconds(),
+      workorder_completion_proto.operator_index());
+
+  // updateProbability();
+  if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+    updateFeedbackFromQueriesInPriorityLevel(priority_level);
+  }
+}
+
+void Learner::updateProbabilitiesForQueriesInPriorityLevel(
+    const std::size_t priority_level, const std::size_t query_id) {
+  DCHECK(isPriorityLevelPresent(priority_level));
+  if (execution_stats_[priority_level].empty()) {
+    LOG(INFO) << "Updating probabilities for query ID: " << query_id
+              << " and priority level: " << priority_level
+              << " that has no queries";
+    return;
+  } else if (execution_stats_[priority_level].size() == 1u) {
+    DCHECK(current_probabilities_[priority_level] != nullptr);
+    DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
+    // As we want the probability of the lone query in this priority level as
+    // 1, we set the numerator same as denominator.
+    const std::size_t numerator =
+        current_probabilities_[priority_level]->getDenominator();
+    current_probabilities_[priority_level]->addOrUpdateObject(query_id,
+                                                              numerator);
+    return;
+  }
+  // Else, there are more than one queries for the given priority level.
+  std::unordered_map<std::size_t, std::size_t>
+      mean_workorders_per_query =
+          getMeanWorkOrderTimesForQueriesInPriorityLevel(priority_level);
+  const float denominator = calculateDenominator(mean_workorders_per_query);
+  if (denominator != 0) {
+    // Update the numerator for the given query and denominator for all the
+    // queries.
+    DCHECK(mean_workorders_per_query.find(query_id) !=
+           mean_workorders_per_query.end());
+    current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+        query_id, mean_workorders_per_query[query_id], denominator);
+  } else {
+    // At least one of the queries has predicted time for next work order as 0.
+    // In such a case, we don't update the probabilities and continue to use
+    // the older probabilities.
+  }
+}
+
+void Learner::updateProbabilitiesOfAllPriorityLevels(
+    const std::size_t priority_level) {
+  if (!hasFeedbackFromAllPriorityLevels() ||
+      has_feedback_from_all_queries_.empty()) {
+    // Either we don't have enough feedback messages from all the priority
+    // levels OR there are no active queries in the system.
+    return;
+  }
+  // Compute the predicted work order execution times for all the level.
+  std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
+  std::size_t sum_active_priorities = 0;
+  for (auto priority_iter : has_feedback_from_all_queries_) {
+    std::size_t total_time_curr_level = 0;
+    const std::size_t curr_priority_level = priority_iter.first;
+    sum_active_priorities += curr_priority_level;
+    // For each query, find its predicted work order execution time.
+    const std::unordered_map<std::size_t, std::size_t>
+        mean_workorders_all_queries_curr_level =
+            getMeanWorkOrderTimesForQueriesInPriorityLevel(
+                curr_priority_level);
+    for (auto mean_workorder_entry : mean_workorders_all_queries_curr_level) {
+      total_time_curr_level += mean_workorder_entry.second;
+    }
+    const std::size_t num_queries_in_priority_level =
+        execution_stats_[priority_level].size();
+    DCHECK_GT(num_queries_in_priority_level, 0u);
+    predicted_time_for_level[curr_priority_level] =
+        total_time_curr_level / num_queries_in_priority_level;
+  }
+  DCHECK_GT(sum_active_priorities, 0u);
+  // Now compute the allowable number of work orders for each priority level
+  // that can be executed given a unit total time.
+  // Key = priority level, value = the # of WO mentioned above.
+  std::unordered_map<std::size_t, float> num_workorders_for_level;
+  float total_num_workorders = 0;
+  for (auto predicted_time_iter : predicted_time_for_level) {
+    const std::size_t curr_priority_level = predicted_time_iter.first;
+    const std::size_t num_workorders_for_curr_level =
+        (predicted_time_iter.second == 0)
+            ? 0
+            : static_cast<float>(curr_priority_level) /
+                  sum_active_priorities /
+                  static_cast<float>(predicted_time_iter.second);
+    num_workorders_for_level[curr_priority_level] = num_workorders_for_curr_level;
+    total_num_workorders += num_workorders_for_curr_level;
+  }
+  if (total_num_workorders == 0) {
+    // No priority level can be selected at this point.
+    return;
+  }
+  // Finally, compute the probabilities.
+  std::vector<std::size_t> priority_levels;
+  std::vector<float> numerators;
+  for (auto num_workorders_iter : num_workorders_for_level) {
+    priority_levels.emplace_back(num_workorders_iter.first);
+    numerators.emplace_back(num_workorders_iter.second);
+  }
+  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+      priority_levels, numerators, total_num_workorders);
+}
+
+void Learner::initializeDefaultProbabilitiesForAllQueries() {
+  for (auto queries_same_priority_level_iter = execution_stats_.begin();
+       queries_same_priority_level_iter != execution_stats_.end();
+       ++queries_same_priority_level_iter) {
+    std::vector<std::size_t> query_ids;
+    const auto &queries_vector = queries_same_priority_level_iter->second;
+    DCHECK(!queries_vector.empty());
+    for (auto query_iter = queries_vector.cbegin();
+         query_iter != queries_vector.cend();
+         ++query_iter) {
+      query_ids.emplace_back(query_iter->first);
+    }
+    // Numerator for each query is 1.0
+    // The common denominator is number of queries with the given priority level.
+    std::vector<float> numerators(query_ids.size(), 1.0);
+    // Reset the probability store for this level.
+    const std::size_t curr_priority_level =
+        queries_same_priority_level_iter->first;
+    default_probabilities_[curr_priority_level].reset(new ProbabilityStore());
+    default_probabilities_[curr_priority_level]
+        ->addOrUpdateObjectsNewDenominator(
+            query_ids, numerators, query_ids.size());
+  }
+}
+
+void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
+  probabilities_of_priority_levels_.reset(new ProbabilityStore());
+  std::vector<std::size_t> priority_levels;
+  std::vector<float> numerators;
+  float sum_priority_levels = 0;
+  for (auto priority_iter = execution_stats_.cbegin();
+       priority_iter != execution_stats_.cend();
+       ++priority_iter) {
+    sum_priority_levels += priority_iter->second.size();
+    priority_levels.emplace_back(priority_iter->first);
+    numerators.emplace_back(priority_iter->first);
+  }
+  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+      priority_levels, numerators, sum_priority_levels);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fe319aea/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
new file mode 100644
index 0000000..64120a7
--- /dev/null
+++ b/query_execution/Learner.hpp
@@ -0,0 +1,352 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_int32(max_past_entries_learner,
+              10,
+              "The maximum number of past WorkOrder execution statistics"
+              " entries for a query");
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+class Learner {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Learner() {
+  }
+
+  void addCompletionFeedback(
+      const serialization::NormalWorkOrderCompletionMessage
+          &workorder_completion_proto);
+
+  void addQuery(const QueryHandle &query_handle) {
+    initializePriorityLevelIfNotPresent(query_handle.query_priority());
+    initializeQuery(query_handle);
+    relearn();
+  }
+
+  void removeQuery(const std::size_t query_id) {
+    // Find the iterator to the query in execution_stats_.
+    DCHECK(isQueryPresent(query_id));
+    const std::size_t priority_level = getQueryPriority(query_id);
+    auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
+    execution_stats_[priority_level].erase(stats_iter_mutable);
+    current_probabilities_[priority_level]->removeObject(query_id);
+    checkAndRemovePriorityLevel(priority_level);
+    relearn();
+  }
+
+  void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+    ExecutionStats *stats = getExecutionStats(query_id);
+    DCHECK(stats != nullptr);
+    stats->removeOperator(operator_id);
+  }
+
+  void relearn() {
+    if (hasActiveQueries()) {
+      initializeDefaultProbabilitiesForAllQueries();
+      initializeDefaultProbabilitiesForPriorityLevels();
+    }
+  }
+
+  void updateProbabilitiesForQueriesInPriorityLevel(
+      const std::size_t priority_level, const std::size_t query_id);
+
+  // TODO(harshad) - Cache internal results from previous invocation of this
+  // function and reuse them. There's a lot of redundancy in computations
+  // at this point.
+  void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+
+ private:
+  /**
+   * @brief Initialize the default probabilities for the queries.
+   **/
+  void initializeDefaultProbabilitiesForAllQueries();
+
+  /**
+   * @brief Initialize the default probabilities for the priority levels.
+   **/
+  void initializeDefaultProbabilitiesForPriorityLevels();
+
+  /**
+   * @brief Initialize the data structures for a given priority level, if none
+   *        exist. If there are already data structures for the given priority
+   *        level, do nothing.
+   **/
+  inline void initializePriorityLevelIfNotPresent(
+      const std::size_t priority_level) {
+    if (isPriorityLevelPresent(priority_level)) {
+      current_probabilities_[priority_level].reset(new ProbabilityStore());
+      // Calculate the default probability for the priority level here and use
+      // it instead of 0.5 here.
+      // TODO(harshad) - Correct this.
+      probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+      execution_stats_[priority_level];
+    }
+  }
+
+  /**
+   * @brief First check if the priority level needs to be removed from the local
+   *        data structures and remove if needed.
+   *
+   * @param priority_level The priority level.
+   **/
+  inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    if (execution_stats_[priority_level].empty()) {
+      execution_stats_.erase(priority_level);
+      current_probabilities_.erase(priority_level);
+      probabilities_of_priority_levels_->removeObject(priority_level);
+      has_feedback_from_all_queries_.erase(priority_level);
+    }
+  }
+
+  /**
+   * @brief Check if the Learner has presence of the given priority level.
+   **/
+  inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
+    DCHECK_EQ((current_probabilities_.find(priority_level) ==
+               current_probabilities_.end()),
+              execution_stats_.find(priority_level) == execution_stats_.end());
+    return (execution_stats_.find(priority_level) != execution_stats_.end());
+  }
+
+  /**
+   * @brief Check if the query is present.
+   **/
+  inline bool isQueryPresent(const std::size_t query_id) const {
+    return query_id_to_priority_lookup_.find(query_id) !=
+           query_id_to_priority_lookup_.end();
+  }
+
+  /**
+   * @brief Initialize all the data structures for a new query.
+   *
+   * @param query_handle The query handle for the new query.
+   **/
+  void initializeQuery(const QueryHandle &query_handle) {
+    const std::size_t priority_level = query_handle.query_priority();
+    const std::size_t query_id = query_handle.query_id();
+    DCHECK(isPriorityLevelPresent(priority_level));
+    query_id_to_priority_lookup_[query_id] = priority_level;
+    execution_stats_[priority_level].emplace_back(
+        query_id,
+        std::unique_ptr<ExecutionStats>(
+            new ExecutionStats(FLAGS_max_past_entries_learner)));
+    // As we are initializing the query, we obviously haven't gotten any
+    // feedback message for this query. Hence mark the following field as false.
+    has_feedback_from_all_queries_[priority_level] = false;
+  }
+
+  /**
+   * @brief Get the execution stats object for the given query.
+   *
+   * @return A pointer to the ExecutionStats for the query. If not present,
+   *         returns NULL.
+   **/
+  inline ExecutionStats* getExecutionStats(const std::size_t query_id) {
+    if (isQueryPresent(query_id)) {
+      const auto stats_iter = getExecutionStatsIterMutable(query_id);
+      DCHECK(stats_iter !=
+             std::end(execution_stats_[getQueryPriority(query_id)]));
+      return stats_iter->second.get();
+    }
+    return nullptr;
+  }
+
+  /**
+   * @brief This function works well when the query and priority level exists
+   *        in the data structures.
+   *
+   **/
+  inline std::vector<
+      std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
+      getExecutionStatsIterMutable(const std::size_t query_id) {
+    const std::size_t priority_level = getQueryPriority(query_id);
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_[priority_level];
+    // The following line uses std::find_if to reach to the desired element
+    // in the stats_vector.
+    auto stats_iter = std::find_if(
+        stats_vector.begin(),
+        stats_vector.end(),
+        [&query_id](
+            const std::pair<std::size_t, std::unique_ptr<ExecutionStats>> &p) {
+          return p.first == query_id;
+        });
+    return stats_iter;
+  }
+
+  inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+    const auto it = query_id_to_priority_lookup_.find(query_id);
+    DCHECK(it != query_id_to_priority_lookup_.end());
+    return it->second;
+  }
+
+  /**
+   * @brief Check if we have received at least one feedback message from all the
+   *        queries in the given priority level.
+   **/
+  inline bool hasFeedbackFromAllQueriesInPriorityLevel(
+      const std::size_t priority_level) const {
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_.at(priority_level);
+    for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+      DCHECK(stats_vector[i].second != nullptr);
+      if (!stats_vector[i].second->hasStats()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  inline void updateFeedbackFromQueriesInPriorityLevel(
+      const std::size_t priority_level) {
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_.at(priority_level);
+    for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+      DCHECK(stats_vector[i].second != nullptr);
+      if (!stats_vector[i].second->hasStats()) {
+        // At least one query has no statistics so far.
+        return;
+      }
+    }
+    // All the queries have at least one execution statistic.
+    has_feedback_from_all_queries_[priority_level] = true;
+  }
+
+  inline const std::size_t hasActiveQueries() const {
+    return !query_id_to_priority_lookup_.empty();
+  }
+
+  /**
+   * @brief Get the mean work order execution times for all the queries in
+   *        a given priority level.
+   *
+   * @param priority_level The priority level.
+   *
+   * @return An unordered_map in which: Key = query ID.
+   *         Value = Mean time per work order for that query.
+   **/
+  inline std::unordered_map<std::size_t, std::size_t>
+  getMeanWorkOrderTimesForQueriesInPriorityLevel(
+      const std::size_t priority_level) {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    std::unordered_map<std::size_t, std::size_t> result;
+    for (auto it = execution_stats_[priority_level].begin();
+         it != execution_stats_[priority_level].end();
+         ++it) {
+      DCHECK(it->second.get() != nullptr);
+      auto query_stats = it->second->getCurrentStats();
+      result[it->first] =
+          query_stats.second == 0 ? 0 : query_stats.first / query_stats.second;
+    }
+    return result;
+  }
+
+  /**
+   * @param mean_workorder_per_query A vector of pairs in which:
+   *        1st element is mean time per work order
+   *        2nd element is the query ID.
+   *
+   * @note If any query has mean work order time as 0, we return 0 as the
+   *       denominator.
+   *
+   * @return The denominator to be used for probability calculations.
+   **/
+  inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
+                                        &mean_workorder_per_query) const {
+    float denominator = 0;
+    for (const auto &element : mean_workorder_per_query) {
+      if (element.second != 0) {
+        denominator += 1/static_cast<float>(element.second);
+      } else {
+        return 0;
+      }
+    }
+    return denominator;
+  }
+
+  inline bool hasFeedbackFromAllPriorityLevels() const {
+    for (auto feedback : has_feedback_from_all_queries_) {
+      if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Key = Priority level, value = A vector of pairs.
+  // Each pair:
+  // 1st element: Query ID.
+  // 2nd Element: Execution statistics for the query.
+  std::unordered_map<
+      std::size_t,
+      std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>>
+      execution_stats_;
+
+  // Key = query ID, value = priority level for the query ID.
+  std::unordered_map<std::size_t, std::size_t> query_id_to_priority_lookup_;
+
+  // Key = priority level, value = ProbabilityStore for the queries belonging to
+  // that priority level.
+  std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+      current_probabilities_;
+
+  // Key = priority level, value = ProbabilityStore for the queries belonging to
+  // that priority level.
+  std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+      default_probabilities_;
+
+  // ProbabilityStrore for probabilities mapped to the priority levels.
+  std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+
+  // Key = priority level. Value = A boolean that indicates if we have received
+  // feedback from all the queries in the given priority level.
+  // TODO(harshad) - Invalidate the cache whenever needed.
+  std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+
+  DISALLOW_COPY_AND_ASSIGN(Learner);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fe319aea/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index db7206b..ff734ca 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
@@ -42,6 +43,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " the workers.");
 
 bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+  Learner learner;
   if (admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fe319aea/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 8343d24..d31caa6 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -22,6 +22,7 @@
 #include <cstddef>
 #include <random>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "utility/Macros.hpp"
@@ -40,7 +41,7 @@ class ProbabilityStore {
    * @brief Constructor.
    **/
   ProbabilityStore()
-      : mt_(std::random_device()()) {}
+      : common_denominator_(1.0), mt_(std::random_device()()) {}
 
   /**
    * @brief Get the number of objects in the store.
@@ -50,6 +51,10 @@ class ProbabilityStore {
     return individual_probabilities_.size();
   }
 
+  inline const std::size_t getDenominator() const {
+    return common_denominator_;
+  }
+
   /**
    * @brief Add individual (not cumulative) probability for a given object.
    *
@@ -59,16 +64,48 @@ class ProbabilityStore {
    * @note This function may override previously written probability values.
    *
    * @param property The property of the given object.
-   * @param individual_probability The individual (not cumulative) probability
-   *        of the given object.
+   * @param numerator The numerator for the given object.
    **/
-  void addProbability(const std::size_t property,
-                      const float individual_probability) {
-    individual_probabilities_[property] = individual_probability;
+  void addOrUpdateObject(const std::size_t property,
+                         const float numerator) {
+    DCHECK_LE(numerator, common_denominator_);
+    // We should have the correct individual probability in
+    // individual_probabilities_ for the newly added object at this point.
+    // Because we rely on the probabilities for all the objects in
+    // updateCumulativeProbabilities().
+    individual_probabilities_[property] =
+        std::make_pair(numerator, numerator / common_denominator_);
     updateCumulativeProbabilities();
   }
 
   /**
+   * @brief Add individual (not cumulative) probability for a given object with
+   *        updated denominator.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param property The property of the given object.
+   * @param numerator The numerator for the given object.
+   * @param new_denominator The updated denominator for the store.
+   **/
+  void addOrUpdateObjectNewDenominator(const std::size_t property,
+                                       const float numerator,
+                                       const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    DCHECK_LE(numerator, new_denominator);
+    common_denominator_ = new_denominator;
+    // It is alright to not have the correct probability in
+    // individual_probabilities_ for the newly added object at this point.
+    // Because we compute the probabilities for all the objects in
+    // updateProbabilitiesNewDenominator().
+    individual_probabilities_[property] = std::make_pair(numerator, 0.0);
+    updateProbabilitiesNewDenominator();
+  }
+
+  /**
    * @brief Add individual (not cumulative) probabilities for given objects.
    *
    * @note This function leaves the cumulative probabilities in a consistent
@@ -77,30 +114,40 @@ class ProbabilityStore {
    * @note This function may override previously written probability values.
    *
    * @param properties A vector of properties to be added.
-   * @param individual_probabilities The individual (not cumulative)
-   *        probabilities of the given objects.
+   * @param numerators The numerators of the given objects.
    **/
-  void addProbabilities(const std::vector<std::size_t> &properties,
-                        const std::vector<float> &individual_probabilities) {
-    DCHECK_EQ(properties.size(), individual_probabilities.size());
+  void addOrUpdateObjects(const std::vector<std::size_t> &properties,
+                          const std::vector<float> &numerators) {
+    DCHECK_EQ(properties.size(), numerators.size());
     for (std::size_t i = 0; i < properties.size(); ++i) {
-      individual_probabilities_[properties[i]] = individual_probabilities[i];
+      DCHECK_LE(numerators[i], common_denominator_);
+      // We should have the correct individual probability in
+      // individual_probabilities_ for the newly added object at this point.
+      // Because we rely on the probabilities for all the objects in
+      // updateCumulativeProbabilities().
+      individual_probabilities_[properties[i]] =
+          std::make_pair(numerators[i], numerators[i] / common_denominator_);
     }
     updateCumulativeProbabilities();
   }
 
-  /**
-   * @brief Update  the probability of a given object to a new value.
-   *
-   * @param property The property of the object.
-   * @param new_individual_probability The new probability to be set.
-   **/
-  void updateProbability(const std::size_t property,
-                         const float new_individual_probability) {
-    auto it = individual_probabilities_.find(property);
-    DCHECK(it != individual_probabilities_.end());
-    it->second = new_individual_probability;
-    updateCumulativeProbabilities();
+  void addOrUpdateObjectsNewDenominator(
+      const std::vector<std::size_t> &properties,
+      const std::vector<float> &numerators,
+      const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    DCHECK_EQ(properties.size(), numerators.size());
+    common_denominator_ = new_denominator;
+    for (std::size_t i = 0; i < properties.size(); ++i) {
+      DCHECK_LE(numerators[i], common_denominator_);
+      // It is alright to not have the correct probability in
+      // individual_probabilities_ for the newly added object at this point.
+      // Because we compute the probabilities for all the objects in
+      // updateProbabilitiesNewDenominator().
+      individual_probabilities_[properties[i]] =
+          std::make_pair(numerators[i], 0.0);
+    }
+    updateProbabilitiesNewDenominator();
   }
 
   /**
@@ -109,10 +156,24 @@ class ProbabilityStore {
    * @param property The property of the object to be removed.
    **/
   void removeObject(const std::size_t property) {
-    auto it = individual_probabilities_.find(property);
-    DCHECK(it != individual_probabilities_.end());
-    individual_probabilities_.erase(it);
-    updateCumulativeProbabilities();
+    auto individual_it = individual_probabilities_.find(property);
+    DCHECK(individual_it != individual_probabilities_.end());
+    individual_probabilities_.erase(individual_it);
+    if (!individual_probabilities_.empty()) {
+      float new_denominator = 0;
+      for (auto it = individual_probabilities_.begin();
+           it != individual_probabilities_.end();
+           ++it) {
+        new_denominator += it->second.first;
+      }
+      CHECK_GT(new_denominator, 0);
+      common_denominator_ = new_denominator;
+      updateCumulativeProbabilities();
+    } else {
+      // In order to keep the store consistent, we should keep the sizes of
+      // individual_probabilities_ and cumulative_probabilities_ the same.
+      cumulative_probabilities_.clear();
+    }
   }
 
   /**
@@ -123,7 +184,7 @@ class ProbabilityStore {
   const float getIndividualProbability(const std::size_t property) const {
     const auto it = individual_probabilities_.find(property);
     DCHECK(it != individual_probabilities_.end());
-    return it->second;
+    return it->second.second;
   }
 
   /**
@@ -141,13 +202,13 @@ class ProbabilityStore {
       return;
     }
     float cumulative_probability = 0;
-    for (const auto property_probability_pair : individual_probabilities_) {
-      cumulative_probabilities_.emplace_back(property_probability_pair.first,
+    for (const auto p : individual_probabilities_) {
+      cumulative_probabilities_.emplace_back(p.first,
                                              cumulative_probability);
-      cumulative_probability += property_probability_pair.second;
+      cumulative_probability += p.second.second;
     }
-    // Adjust the last cumulative probability manually to 1.0, so that floating
-    // addition related rounding issues are ignored.
+    // Adjust the last cumulative probability manually to 1.0, so that
+    // floating addition related rounding issues are ignored.
     cumulative_probabilities_.back().updateProbability(1.0);
   }
 
@@ -208,9 +269,26 @@ class ProbabilityStore {
     return it->property_;
   }
 
-  std::unordered_map<std::size_t, float> individual_probabilities_;
+  inline void updateProbabilitiesNewDenominator() {
+    // First update the individual probabilities.
+    for (auto it = individual_probabilities_.begin();
+         it != individual_probabilities_.end();
+         ++it) {
+      DCHECK_LE(it->second.first, common_denominator_);
+      it->second.second = it->second.first / common_denominator_;
+    }
+    updateCumulativeProbabilities();
+  }
+
+  // Key = property of the object.
+  // Value = A pair ...
+  // 1st element: Numerator of the object.
+  // 2nd element: Individual probability of the object.
+  std::unordered_map<std::size_t, std::pair<float, float>> individual_probabilities_;
   std::vector<ProbabilityInfo> cumulative_probabilities_;
 
+  float common_denominator_;
+
   std::mt19937_64 mt_;
 
   DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fe319aea/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index e624557..dcec1e5 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -28,14 +28,15 @@ TEST(ProbabilityStoreTest, CountTest) {
   ProbabilityStore store;
   EXPECT_EQ(0u, store.getNumObjects());
   const std::size_t kProperty = 0;
-  store.addProbability(kProperty, 0.5);
+  store.addOrUpdateObject(kProperty, 1);
   EXPECT_EQ(1u, store.getNumObjects());
   store.removeObject(kProperty);
   EXPECT_EQ(0u, store.getNumObjects());
 
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   EXPECT_EQ(objects.size(), store.getNumObjects());
 }
@@ -43,11 +44,12 @@ TEST(ProbabilityStoreTest, CountTest) {
 TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
-    EXPECT_EQ(probabilities[object_num],
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
               store.getIndividualProbability(objects[object_num]));
   }
 }
@@ -55,8 +57,9 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
 TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   const std::size_t kNumTrials = 10;
   while (!objects.empty()) {
@@ -72,4 +75,30 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
   }
 }
 
+TEST(ProbabilityStoreTest, RemoveObjectTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+
+  // Remove last object "9", with numerator 5.
+  store.removeObject(objects.back());
+  objects.pop_back();
+  numerators.pop_back();
+  const float expected_new_denominator =
+      std::accumulate(numerators.begin(), numerators.end(), 0);
+
+  EXPECT_EQ(expected_new_denominator, store.getDenominator());
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+}
+
 }  // namespace quickstep