You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/05 15:43:44 UTC

[02/21] incubator-quickstep git commit: API to find the highest priority level in the learner

API to find the highest priority level in the learner

- Unit tests to test the feature.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/996ca75b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/996ca75b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/996ca75b

Branch: refs/heads/scheduler++
Commit: 996ca75b4368ee0fc338af5707d52bfd62ef7389
Parents: 2d93312
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 14:45:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             | 12 +++--
 query_execution/Learner.cpp                | 33 +++++++++++++
 query_execution/Learner.hpp                | 30 +++++++-----
 query_execution/ProbabilityStore.hpp       | 10 ++--
 query_execution/QueryExecutionTypedefs.hpp |  2 +
 query_execution/tests/Learner_unittest.cpp | 64 +++++++++++++++++++++++++
 6 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ef1ce99..4639617 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -76,7 +76,7 @@ target_link_libraries(quickstep_queryexecution_ExecutionStats
                       glog
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_Foreman
-                      ${GFLAGS_LIB_NAME} 
+                      ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanLite
@@ -99,7 +99,8 @@ target_link_libraries(quickstep_queryexecution_Learner
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_ProbabilityStore
-                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
@@ -274,9 +275,10 @@ target_link_libraries(Learner_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_Learner
-                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryoptimizer_QueryHandle)
-add_test(Learner_unittest Learner_unittest) 
+add_test(Learner_unittest Learner_unittest)
 
 add_executable(ProbabilityStore_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
@@ -284,7 +286,7 @@ target_link_libraries(ProbabilityStore_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_ProbabilityStore)
-add_test(ProbabilityStore_unittest ProbabilityStore_unittest) 
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
 
 add_executable(QueryManager_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index c7a7064..720df33 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,6 +26,7 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
@@ -39,6 +40,11 @@ DEFINE_uint64(max_past_entries_learner,
               "The maximum number of past WorkOrder execution statistics"
               " entries for a query");
 
+Learner::Learner()
+    : highest_priority_level_(kInvalidPriorityLevel) {
+  probabilities_of_priority_levels_.reset(new ProbabilityStore());
+}
+
 void Learner::addCompletionFeedback(
     const serialization::NormalWorkOrderCompletionMessage
         &workorder_completion_proto) {
@@ -214,4 +220,31 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
   has_feedback_from_all_queries_[priority_level] = false;
 }
 
+void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
+  DCHECK(isPriorityLevelPresent(priority_level));
+  if (execution_stats_[priority_level].empty()) {
+    execution_stats_.erase(priority_level);
+    current_probabilities_.erase(priority_level);
+    probabilities_of_priority_levels_->removeObject(priority_level);
+    has_feedback_from_all_queries_.erase(priority_level);
+    if (hasActiveQueries()) {
+      if (static_cast<int>(priority_level) == highest_priority_level_) {
+        // The priority level to be removed is the highest priority level.
+        std::size_t new_highest_priority_level = 0;
+        // Find the new highest priority level.
+        for (auto priority_level_it = execution_stats_.cbegin();
+             priority_level_it != execution_stats_.cend();
+             ++priority_level_it) {
+          if (priority_level_it->first > new_highest_priority_level) {
+            new_highest_priority_level = priority_level_it->first;
+          }
+        }
+        highest_priority_level_ = static_cast<int>(new_highest_priority_level);
+      }
+    } else {
+      highest_priority_level_ = kInvalidPriorityLevel;
+    }
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 073b693..f99b1c6 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -43,9 +43,7 @@ class Learner {
   /**
    * @brief Constructor.
    **/
-  Learner() {
-    probabilities_of_priority_levels_.reset(new ProbabilityStore());
-  }
+  Learner();
 
   void addCompletionFeedback(
       const serialization::NormalWorkOrderCompletionMessage
@@ -107,6 +105,16 @@ class Learner {
     return query_id_to_priority_lookup_.size();
   }
 
+  /**
+   * @brief Get the highest priority level among the active queries.
+   *
+   * @return The highest priority level. If the system is empty it returns
+   *         kInvalidPriorityLevel.
+   **/
+  inline const int getHighestPriorityLevel() const {
+    return highest_priority_level_;
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -151,9 +159,13 @@ class Learner {
    **/
   inline void initializePriorityLevelIfNotPresent(
       const std::size_t priority_level) {
+    CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
     if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
       execution_stats_[priority_level];
+      if (static_cast<int>(priority_level) > highest_priority_level_) {
+        highest_priority_level_ = priority_level;
+      }
     }
   }
 
@@ -163,15 +175,7 @@ class Learner {
    *
    * @param priority_level The priority level.
    **/
-  inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
-    DCHECK(isPriorityLevelPresent(priority_level));
-    if (execution_stats_[priority_level].empty()) {
-      execution_stats_.erase(priority_level);
-      current_probabilities_.erase(priority_level);
-      probabilities_of_priority_levels_->removeObject(priority_level);
-      has_feedback_from_all_queries_.erase(priority_level);
-    }
-  }
+  void checkAndRemovePriorityLevel(const std::size_t priority_level);
 
   /**
    * @brief Check if the Learner has presence of the given priority level.
@@ -369,6 +373,8 @@ class Learner {
   // feedback from all the queries in the given priority level.
   std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
+  int highest_priority_level_;
+
   DISALLOW_COPY_AND_ASSIGN(Learner);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 347df89..233dd2e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -212,9 +212,10 @@ class ProbabilityStore {
                                              cumulative_probability);
       cumulative_probability += p.second.second;
     }
-    // Adjust the last cumulative probability manually to 1.0, so that
+    DCHECK(!cumulative_probabilities_.empty());
+    // Adjust the last cumulative probability manually to 1, so that
     // floating addition related rounding issues are ignored.
-    cumulative_probabilities_.back().updateProbability(1.0);
+    cumulative_probabilities_.back().updateProbability(1);
   }
 
   /**
@@ -233,7 +234,9 @@ class ProbabilityStore {
    public:
     ProbabilityInfo(const std::size_t property, const float probability)
         : property_(property), probability_(probability) {
-      DCHECK_LE(probability, 1.0);
+      // As GLOG doesn't provide DEBUG only checks for less than equal
+      // comparison for floats, we can't ensure that probability is less than
+      // 1.0.
     }
 
     ProbabilityInfo(const ProbabilityInfo &other) = default;
@@ -241,7 +244,6 @@ class ProbabilityStore {
     ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
 
     void updateProbability(const float new_probability) {
-      DCHECK_LE(new_probability, 1.0);
       probability_ = new_probability;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..e13f3e0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -43,6 +43,8 @@ typedef tmb::TaggedMessage TaggedMessage;
 typedef tmb::client_id client_id;
 typedef tmb::message_type_id message_type_id;
 
+const int kInvalidPriorityLevel = -1;
+
 using ClientIDMap = ThreadIDBasedMap<client_id,
                                      'C',
                                      'l',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 556c984..107576f 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -15,12 +15,18 @@
  *   limitations under the License.
  **/
 
+#include <algorithm>
+#include <chrono>
+#include <cstddef>
 #include <memory>
+#include <random>
+#include <vector>
 
 #include "gtest/gtest.h"
 
 #include "query_execution/Learner.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 
 namespace quickstep {
@@ -199,4 +205,62 @@ TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
     }
   }
 }
+
+TEST_F(LearnerTest, HighestPriorityLevelTest) {
+  std::vector<std::size_t> priorities_insertion_order;
+  std::vector<std::size_t> priorities_removal_order;
+  const std::size_t kNumPrioritiesToTest = 20;
+  for (std::size_t priority_num = 1;
+       priority_num <= kNumPrioritiesToTest;
+       ++priority_num) {
+    // Note: Priority level should be non-zero, hence we begin from 1.
+    priorities_insertion_order.emplace_back(priority_num);
+    priorities_removal_order.emplace_back(priority_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g(rd());
+
+  std::shuffle(priorities_insertion_order.begin(),
+               priorities_insertion_order.end(),
+               g);
+
+  std::shuffle(priorities_removal_order.begin(),
+               priorities_removal_order.end(),
+               g);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+
+  std::unique_ptr<QueryHandle> handle;
+  // First insert the queries in the order of priorities as defined by
+  // priorities_insertion_order.
+  for (auto it = priorities_insertion_order.begin();
+       it != priorities_insertion_order.end();
+       ++it) {
+    // Note that the query ID is kept the same as priority level for simplicity.
+    handle.reset(new QueryHandle(*it, *it));
+    learner.addQuery(*handle);
+    const std::size_t max_priority_so_far =
+        *(std::max_element(priorities_insertion_order.begin(), it + 1));
+    EXPECT_EQ(static_cast<int>(max_priority_so_far),
+              learner.getHighestPriorityLevel());
+  }
+  // Now remove the queries in the order of priorities as defined by
+  // priorities_removal_order.
+  for (auto it = priorities_removal_order.begin();
+       it != priorities_removal_order.end();
+       ++it) {
+    // Recall that the query ID is the same as priority level.
+    const std::size_t max_priority_so_far =
+        *(std::max_element(it, priorities_removal_order.end()));
+    EXPECT_EQ(static_cast<int>(max_priority_so_far),
+              learner.getHighestPriorityLevel());
+    learner.removeQuery(*it);
+  }
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+}
+
 }  // namespace quickstep