You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/05 15:43:44 UTC
[02/21] incubator-quickstep git commit: API to find the highest
priority level in the learner
API to find the highest priority level in the learner
- Unit tests to test the feature.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/996ca75b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/996ca75b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/996ca75b
Branch: refs/heads/scheduler++
Commit: 996ca75b4368ee0fc338af5707d52bfd62ef7389
Parents: 2d93312
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 14:45:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 5 10:43:10 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 12 +++--
query_execution/Learner.cpp | 33 +++++++++++++
query_execution/Learner.hpp | 30 +++++++-----
query_execution/ProbabilityStore.hpp | 10 ++--
query_execution/QueryExecutionTypedefs.hpp | 2 +
query_execution/tests/Learner_unittest.cpp | 64 +++++++++++++++++++++++++
6 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ef1ce99..4639617 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -76,7 +76,7 @@ target_link_libraries(quickstep_queryexecution_ExecutionStats
glog
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_Foreman
- ${GFLAGS_LIB_NAME}
+ ${GFLAGS_LIB_NAME}
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanLite
@@ -99,7 +99,8 @@ target_link_libraries(quickstep_queryexecution_Learner
glog
quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_ProbabilityStore
- quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryoptimizer_QueryHandle
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
@@ -274,9 +275,10 @@ target_link_libraries(Learner_unittest
gtest
gtest_main
quickstep_queryexecution_Learner
- quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryoptimizer_QueryHandle)
-add_test(Learner_unittest Learner_unittest)
+add_test(Learner_unittest Learner_unittest)
add_executable(ProbabilityStore_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
@@ -284,7 +286,7 @@ target_link_libraries(ProbabilityStore_unittest
gtest
gtest_main
quickstep_queryexecution_ProbabilityStore)
-add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
add_executable(QueryManager_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index c7a7064..720df33 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,6 +26,7 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
@@ -39,6 +40,11 @@ DEFINE_uint64(max_past_entries_learner,
"The maximum number of past WorkOrder execution statistics"
" entries for a query");
+Learner::Learner()
+ : highest_priority_level_(kInvalidPriorityLevel) {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
+}
+
void Learner::addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto) {
@@ -214,4 +220,31 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
has_feedback_from_all_queries_[priority_level] = false;
}
+void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ execution_stats_.erase(priority_level);
+ current_probabilities_.erase(priority_level);
+ probabilities_of_priority_levels_->removeObject(priority_level);
+ has_feedback_from_all_queries_.erase(priority_level);
+ if (hasActiveQueries()) {
+ if (static_cast<int>(priority_level) == highest_priority_level_) {
+ // The priority level to be removed is the highest priority level.
+ std::size_t new_highest_priority_level = 0;
+ // Find the new highest priority level.
+ for (auto priority_level_it = execution_stats_.cbegin();
+ priority_level_it != execution_stats_.cend();
+ ++priority_level_it) {
+ if (priority_level_it->first > new_highest_priority_level) {
+ new_highest_priority_level = priority_level_it->first;
+ }
+ }
+ highest_priority_level_ = static_cast<int>(new_highest_priority_level);
+ }
+ } else {
+ highest_priority_level_ = kInvalidPriorityLevel;
+ }
+ }
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 073b693..f99b1c6 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -43,9 +43,7 @@ class Learner {
/**
* @brief Constructor.
**/
- Learner() {
- probabilities_of_priority_levels_.reset(new ProbabilityStore());
- }
+ Learner();
void addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
@@ -107,6 +105,16 @@ class Learner {
return query_id_to_priority_lookup_.size();
}
+ /**
+ * @brief Get the highest priority level among the active queries.
+ *
+ * @return The highest priority level. If the system is empty it returns
+ * kInvalidPriorityLevel.
+ **/
+ inline const int getHighestPriorityLevel() const {
+ return highest_priority_level_;
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -151,9 +159,13 @@ class Learner {
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
+ CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
execution_stats_[priority_level];
+ if (static_cast<int>(priority_level) > highest_priority_level_) {
+ highest_priority_level_ = priority_level;
+ }
}
}
@@ -163,15 +175,7 @@ class Learner {
*
* @param priority_level The priority level.
**/
- inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
- DCHECK(isPriorityLevelPresent(priority_level));
- if (execution_stats_[priority_level].empty()) {
- execution_stats_.erase(priority_level);
- current_probabilities_.erase(priority_level);
- probabilities_of_priority_levels_->removeObject(priority_level);
- has_feedback_from_all_queries_.erase(priority_level);
- }
- }
+ void checkAndRemovePriorityLevel(const std::size_t priority_level);
/**
* @brief Check if the Learner has presence of the given priority level.
@@ -369,6 +373,8 @@ class Learner {
// feedback from all the queries in the given priority level.
std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+ int highest_priority_level_;
+
DISALLOW_COPY_AND_ASSIGN(Learner);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 347df89..233dd2e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -212,9 +212,10 @@ class ProbabilityStore {
cumulative_probability);
cumulative_probability += p.second.second;
}
- // Adjust the last cumulative probability manually to 1.0, so that
+ DCHECK(!cumulative_probabilities_.empty());
+ // Adjust the last cumulative probability manually to 1, so that
// floating addition related rounding issues are ignored.
- cumulative_probabilities_.back().updateProbability(1.0);
+ cumulative_probabilities_.back().updateProbability(1);
}
/**
@@ -233,7 +234,9 @@ class ProbabilityStore {
public:
ProbabilityInfo(const std::size_t property, const float probability)
: property_(property), probability_(probability) {
- DCHECK_LE(probability, 1.0);
+ // As GLOG doesn't provide DEBUG only checks for less than equal
+ // comparison for floats, we can't ensure that probability is less than
+ // 1.0.
}
ProbabilityInfo(const ProbabilityInfo &other) = default;
@@ -241,7 +244,6 @@ class ProbabilityStore {
ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
void updateProbability(const float new_probability) {
- DCHECK_LE(new_probability, 1.0);
probability_ = new_probability;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..e13f3e0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -43,6 +43,8 @@ typedef tmb::TaggedMessage TaggedMessage;
typedef tmb::client_id client_id;
typedef tmb::message_type_id message_type_id;
+const int kInvalidPriorityLevel = -1;
+
using ClientIDMap = ThreadIDBasedMap<client_id,
'C',
'l',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/996ca75b/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 556c984..107576f 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -15,12 +15,18 @@
* limitations under the License.
**/
+#include <algorithm>
+#include <chrono>
+#include <cstddef>
#include <memory>
+#include <random>
+#include <vector>
#include "gtest/gtest.h"
#include "query_execution/Learner.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
namespace quickstep {
@@ -199,4 +205,62 @@ TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
}
}
}
+
+TEST_F(LearnerTest, HighestPriorityLevelTest) {
+ std::vector<std::size_t> priorities_insertion_order;
+ std::vector<std::size_t> priorities_removal_order;
+ const std::size_t kNumPrioritiesToTest = 20;
+ for (std::size_t priority_num = 1;
+ priority_num <= kNumPrioritiesToTest;
+ ++priority_num) {
+ // Note: Priority level should be non-zero, hence we begin from 1.
+ priorities_insertion_order.emplace_back(priority_num);
+ priorities_removal_order.emplace_back(priority_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ std::shuffle(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ g);
+
+ std::shuffle(priorities_removal_order.begin(),
+ priorities_removal_order.end(),
+ g);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+
+ std::unique_ptr<QueryHandle> handle;
+ // First insert the queries in the order of priorities as defined by
+ // priorities_insertion_order.
+ for (auto it = priorities_insertion_order.begin();
+ it != priorities_insertion_order.end();
+ ++it) {
+ // Note that the query ID is kept the same as priority level for simplicity.
+ handle.reset(new QueryHandle(*it, *it));
+ learner.addQuery(*handle);
+ const std::size_t max_priority_so_far =
+ *(std::max_element(priorities_insertion_order.begin(), it + 1));
+ EXPECT_EQ(static_cast<int>(max_priority_so_far),
+ learner.getHighestPriorityLevel());
+ }
+ // Now remove the queries in the order of priorities as defined by
+ // priorities_removal_order.
+ for (auto it = priorities_removal_order.begin();
+ it != priorities_removal_order.end();
+ ++it) {
+ // Recall that the query ID is the same as priority level.
+ const std::size_t max_priority_so_far =
+ *(std::max_element(it, priorities_removal_order.end()));
+ EXPECT_EQ(static_cast<int>(max_priority_so_far),
+ learner.getHighestPriorityLevel());
+ learner.removeQuery(*it);
+ }
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+}
+
} // namespace quickstep