You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/30 20:49:39 UTC
[01/18] incubator-quickstep git commit: QUICKSTEP-33: Fixed the bug
in NumericCast. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/scheduler++ 3f345fc7e -> ad8f0d3c2 (forced update)
QUICKSTEP-33: Fixed the bug in NumericCast.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b258821e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b258821e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b258821e
Branch: refs/heads/scheduler++
Commit: b258821ef6c00df199e52249eb1099a6d885bbb1
Parents: 3347003
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Wed Jun 29 14:01:13 2016 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jun 30 13:26:50 2016 -0700
----------------------------------------------------------------------
types/operations/unary_operations/NumericCastOperation.hpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b258821e/types/operations/unary_operations/NumericCastOperation.hpp
----------------------------------------------------------------------
diff --git a/types/operations/unary_operations/NumericCastOperation.hpp b/types/operations/unary_operations/NumericCastOperation.hpp
index 250df6d..6662796 100644
--- a/types/operations/unary_operations/NumericCastOperation.hpp
+++ b/types/operations/unary_operations/NumericCastOperation.hpp
@@ -126,7 +126,7 @@ class UncheckedNumericCastOperator : public UncheckedUnaryOperator {
result->appendNullValue();
} else {
*static_cast<typename TargetType::cpptype*>(result->getPtrForDirectWrite())
- = static_cast<typename SourceType::cpptype>(*scalar_arg);
+ = static_cast<typename TargetType::cpptype>(*scalar_arg);
}
}
return result;
[07/18] incubator-quickstep git commit: Bug fix in initialization of
probabilities.
Posted by hb...@apache.org.
Bug fix in initialization of probabilities.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3e3dda81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3e3dda81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3e3dda81
Branch: refs/heads/scheduler++
Commit: 3e3dda81754b74ef0da3c37a43bc9cac70a78ccf
Parents: 7ea9d32
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:57:01 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.cpp | 7 +++--
query_execution/tests/Learner_unittest.cpp | 38 +++++++++++++++++++++++++
2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3e3dda81/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 5d877b4..38a773b 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -184,9 +184,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
for (auto priority_iter = execution_stats_.cbegin();
priority_iter != execution_stats_.cend();
++priority_iter) {
- sum_priority_levels += priority_iter->second.size();
- priority_levels.emplace_back(priority_iter->first);
- numerators.emplace_back(priority_iter->first);
+ const std::size_t curr_priority_level = priority_iter->first;
+ sum_priority_levels += curr_priority_level;
+ priority_levels.emplace_back(curr_priority_level);
+ numerators.emplace_back(curr_priority_level);
}
if (sum_priority_levels > 0) {
probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3e3dda81/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 74353f0..864bb22 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -84,4 +84,42 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
}
+TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel1 = 1;
+ const std::size_t kPriorityLevel2 = 2;
+ handle1.reset(new QueryHandle(1, kPriorityLevel1));
+ handle2.reset(new QueryHandle(2, kPriorityLevel2));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ learner.addQuery(*handle1);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ learner.addQuery(*handle2);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ learner.removeQuery(handle2->query_id());
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+
+ learner.removeQuery(handle1->query_id());
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+}
+
} // namespace quickstep
[03/18] incubator-quickstep git commit: Bug fixed in probability
store.
Posted by hb...@apache.org.
Bug fixed in probability store.
- Correct calculation for cumulative probabilities after removing an
object.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c1a44e2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c1a44e2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c1a44e2b
Branch: refs/heads/scheduler++
Commit: c1a44e2bd7d5f26b0f000a9bc8a52aba0529ea32
Parents: 63c12a1
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jun 26 09:35:11 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/ProbabilityStore.hpp | 4 ++--
.../tests/ProbabilityStore_unittest.cpp | 18 +++++++++---------
2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1a44e2b/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index ed52f75..7278e2b 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -173,7 +173,7 @@ class ProbabilityStore {
}
CHECK_GT(new_denominator, 0);
common_denominator_ = new_denominator;
- updateCumulativeProbabilities();
+ updateProbabilitiesNewDenominator();
} else {
// In order to keep the store consistent, we should keep the sizes of
// individual_probabilities_ and cumulative_probabilities_ the same.
@@ -208,9 +208,9 @@ class ProbabilityStore {
}
float cumulative_probability = 0;
for (const auto p : individual_probabilities_) {
+ cumulative_probability += p.second.second;
cumulative_probabilities_.emplace_back(p.first,
cumulative_probability);
- cumulative_probability += p.second.second;
}
DCHECK(!cumulative_probabilities_.empty());
// Adjust the last cumulative probability manually to 1, so that
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c1a44e2b/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index dcec1e5..518699e 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -34,8 +34,8 @@ TEST(ProbabilityStoreTest, CountTest) {
EXPECT_EQ(0u, store.getNumObjects());
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> numerators {1, 2, 3, 5};
- const std::size_t kNewDenominator = 10;
+ std::vector<float> numerators {1, 2, 2, 5};
+ const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
EXPECT_EQ(objects.size(), store.getNumObjects());
@@ -44,8 +44,8 @@ TEST(ProbabilityStoreTest, CountTest) {
TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> numerators {1, 2, 3, 5};
- const std::size_t kNewDenominator = 10;
+ std::vector<float> numerators {1, 2, 2, 5};
+ const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
@@ -57,8 +57,8 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> numerators {1, 2, 3, 5};
- const std::size_t kNewDenominator = 10;
+ std::vector<float> numerators {1, 2, 2, 5};
+ const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
const std::size_t kNumTrials = 10;
@@ -78,8 +78,8 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
TEST(ProbabilityStoreTest, RemoveObjectTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> numerators {1, 2, 3, 5};
- const std::size_t kNewDenominator = 10;
+ std::vector<float> numerators {1, 2, 2, 5};
+ const std::size_t kNewDenominator = std::accumulate(numerators.begin(), numerators.end(), 0);
store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
@@ -96,7 +96,7 @@ TEST(ProbabilityStoreTest, RemoveObjectTest) {
EXPECT_EQ(expected_new_denominator, store.getDenominator());
for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
- EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(expected_new_denominator),
store.getIndividualProbability(objects[object_num]));
}
}
[12/18] incubator-quickstep git commit: Fixed getDenominator method.
More unit tests.
Posted by hb...@apache.org.
Fixed getDenominator method. More unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/63c12a1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/63c12a1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/63c12a1a
Branch: refs/heads/scheduler++
Commit: 63c12a1a62f7cd760776dc42819dcc2bdadd73f6
Parents: b65cc2f
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sat Jun 25 09:35:10 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/ExecutionStats.hpp | 15 +-
query_execution/Learner.cpp | 46 +++--
query_execution/Learner.hpp | 147 ++++++++++++--
query_execution/ProbabilityStore.hpp | 15 +-
query_execution/QueryExecutionTypedefs.hpp | 1 +
query_execution/tests/Learner_unittest.cpp | 259 +++++++++++++++++++++++-
6 files changed, 440 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index 769c7a4..5643749 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,17 +58,17 @@ class ExecutionStats {
}
/**
- * @brief Check if there are any stats present.
+ * @brief Check if there are stats present for at least one active operator.
**/
inline bool hasStats() const {
for (auto it = active_operators_.begin();
it != active_operators_.end();
++it) {
- if (!it->second->hasStatsForOperator()) {
- return false;
+ if (it->second->hasStatsForOperator()) {
+ return true;
}
}
- return true;
+ return false;
}
/**
@@ -109,14 +109,13 @@ class ExecutionStats {
* @param operator_index The operator index which the value belongs to.
**/
void addEntry(std::size_t value, std::size_t operator_index) {
- if (hasOperator(operator_index)) {
- // This is not the first entry for the given operator.
- active_operators_[operator_index]->addEntry(value);
- } else {
+ if (!hasOperator(operator_index)) {
+ // This is the first entry for the given operator.
// Create the OperatorStats object for this operator.
active_operators_[operator_index] =
std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
}
+ active_operators_[operator_index]->addEntry(value);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 720df33..11c3735 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,7 +26,6 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
@@ -76,10 +75,20 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
DCHECK(current_probabilities_[priority_level] != nullptr);
// As we want the probability of the lone query in this priority level as
// 1, we set the numerator same as denominator.
- const std::size_t numerator =
- current_probabilities_[priority_level]->getDenominator();
- current_probabilities_[priority_level]->addOrUpdateObject(query_id,
- numerator);
+ // TODO(harshad) - Get the mean work order times here too and use that as
+ // the numerator.
+ ExecutionStats *stats = getExecutionStats(query_id);
+ auto query_stats = stats->getCurrentStats();
+ /*const std::size_t numerator =
+ current_probabilities_[priority_level]->getDenominator();*/
+ if (query_stats.second != 0) {
+ const float mean_workorder_time =
+ query_stats.first / static_cast<float>(query_stats.second);
+ if (mean_workorder_time != 0) {
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id, 1 / mean_workorder_time, 1 / mean_workorder_time);
+ }
+ }
return;
}
// Else, there are more than one queries for the given priority level.
@@ -92,18 +101,25 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
// queries.
DCHECK(mean_workorders_per_query.find(query_id) !=
mean_workorders_per_query.end());
+ DCHECK_NE(mean_workorders_per_query[query_id], 0);
current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
- query_id, mean_workorders_per_query[query_id], denominator);
+ query_id,
+ 1 / static_cast<float>(mean_workorders_per_query[query_id]),
+ denominator);
+ // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
} else {
// At least one of the queries has predicted time for next work order as 0.
// In such a case, we don't update the probabilities and continue to use
// the older probabilities.
+ // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
+ return;
}
}
void Learner::updateProbabilitiesOfAllPriorityLevels() {
- if (!hasFeedbackFromAllPriorityLevels() ||
- has_feedback_from_all_queries_.empty()) {
+ if (!hasFeedbackFromAllPriorityLevels()) {
+ // has_feedback_from_all_queries_.empty()) {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
// Either we don't have enough feedback messages from all the priority
// levels OR there are no active queries in the system.
return;
@@ -111,9 +127,11 @@ void Learner::updateProbabilitiesOfAllPriorityLevels() {
// Compute the predicted work order execution times for all the level.
std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
std::size_t sum_active_priorities = 0;
- for (auto priority_iter : has_feedback_from_all_queries_) {
+ for (auto priority_iter = execution_stats_.begin();
+ priority_iter != execution_stats_.end();
+ ++priority_iter) {
std::size_t total_time_curr_level = 0;
- const std::size_t curr_priority_level = priority_iter.first;
+ const std::size_t curr_priority_level = priority_iter->first;
sum_active_priorities += curr_priority_level;
// For each query, find its predicted work order execution time.
const std::unordered_map<std::size_t, std::size_t>
@@ -194,6 +212,7 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
for (auto priority_iter = execution_stats_.cbegin();
priority_iter != execution_stats_.cend();
++priority_iter) {
+ DCHECK(!priority_iter->second.empty());
const std::size_t curr_priority_level = priority_iter->first;
sum_priority_levels += curr_priority_level;
priority_levels.emplace_back(curr_priority_level);
@@ -217,7 +236,8 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
new ExecutionStats(FLAGS_max_past_entries_learner)));
// As we are initializing the query, we obviously haven't gotten any
// feedback message for this query. Hence mark the following field as false.
- has_feedback_from_all_queries_[priority_level] = false;
+ // has_feedback_from_all_queries_[priority_level] = false;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
}
void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
@@ -226,7 +246,9 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
execution_stats_.erase(priority_level);
current_probabilities_.erase(priority_level);
probabilities_of_priority_levels_->removeObject(priority_level);
- has_feedback_from_all_queries_.erase(priority_level);
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_.erase(priority_level);
+ // LOG(INFO) << "Removed priority level: " << priority_level;
if (hasActiveQueries()) {
if (static_cast<int>(priority_level) == highest_priority_level_) {
// The priority level to be removed is the highest priority level.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index f99b1c6..2c6fdef 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -27,11 +27,14 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
namespace quickstep {
/** \addtogroup QueryExecution
@@ -49,16 +52,26 @@ class Learner {
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto);
+ /**
+ * @brief Add a query to the Learner.
+ *
+ * @param query_handle The query handle for the new query.
+ **/
void addQuery(const QueryHandle &query_handle) {
initializePriorityLevelIfNotPresent(query_handle.query_priority());
initializeQuery(query_handle);
relearn();
}
+ /**
+ * @brief Remove a query from the Learner.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
void removeQuery(const std::size_t query_id) {
- // Find the iterator to the query in execution_stats_.
DCHECK(isQueryPresent(query_id));
const std::size_t priority_level = getQueryPriority(query_id);
+ // Find the iterator to the query in execution_stats_.
auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
execution_stats_[priority_level].erase(stats_iter_mutable);
DCHECK(current_probabilities_.find(priority_level) !=
@@ -69,17 +82,25 @@ class Learner {
// current_probabilities_[priority_level] ProbabilityStore.
current_probabilities_[priority_level]->removeObject(query_id);
}
+ // has_feedback_from_all_queries_[priority_level] = false;
query_id_to_priority_lookup_.erase(query_id);
checkAndRemovePriorityLevel(priority_level);
relearn();
}
- void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+ /**
+ * @brief Remove the stats of a given operator in a given query.
+ **/
+ void removeOperator(const std::size_t query_id,
+ const std::size_t operator_id) {
ExecutionStats *stats = getExecutionStats(query_id);
DCHECK(stats != nullptr);
stats->removeOperator(operator_id);
}
+ /**
+ * @brief Reset the probabilities and start learning again.
+ **/
void relearn() {
if (hasActiveQueries()) {
initializeDefaultProbabilitiesForAllQueries();
@@ -87,10 +108,17 @@ class Learner {
}
}
+ /**
+ * @brief Check if there are any active queries in the Learner.
+ **/
inline const bool hasActiveQueries() const {
return !query_id_to_priority_lookup_.empty();
}
+ /**
+ * @brief Get the number of active queries in the Learner for the given
+ * priority level.
+ **/
inline const std::size_t getNumActiveQueriesInPriorityLevel(
const std::size_t priority_level) const {
const auto it = execution_stats_.find(priority_level);
@@ -101,6 +129,9 @@ class Learner {
}
}
+ /**
+ * @brief Get the total number of active queries in the Learner.
+ **/
inline const std::size_t getTotalNumActiveQueries() const {
return query_id_to_priority_lookup_.size();
}
@@ -115,6 +146,83 @@ class Learner {
return highest_priority_level_;
}
+ /**
+ * @brief Randomly pick a priority level.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A priority level. If no queries are present in the learner, return
+ * kInvalidPriorityLevel.
+ **/
+ inline const int pickRandomPriorityLevel() const {
+ if (hasActiveQueries()) {
+ const int result = static_cast<int>(
+ probabilities_of_priority_levels_->pickRandomProperty());
+ /*LOG(INFO) << "Random priority level: " << result << " has "
+ << current_probabilities_.find(result)->second->getNumObjects()
+ << " queries";*/
+ return result;
+ } else {
+ return kInvalidPriorityLevel;
+ }
+ }
+
+ /**
+ * @brief Randomly pick a query from any priority level in the learner.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A query ID. If no queries are present in the learner, return
+ * kInvalidQueryID.
+ **/
+ inline const int pickRandomQuery() const {
+ if (hasActiveQueries()) {
+ const int random_priority_level = pickRandomPriorityLevel();
+ // Note : The valid priority level values are non-zero.
+ DCHECK_GT(random_priority_level, 0);
+ const int result = pickRandomQueryFromPriorityLevel(
+ static_cast<std::size_t>(random_priority_level));
+ // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
+ return result;
+ } else {
+ // LOG(INFO) << "No active query right now";
+ return kInvalidQueryID;
+ }
+ }
+
+ /**
+ * @brief Randomly pick a query from a given priority level in the learner.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A query ID. If no queries are present for this priority level in
+ * the learner, return kInvalidQueryID.
+ **/
+ inline const int pickRandomQueryFromPriorityLevel(
+ const std::size_t priority_level) const {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (hasActiveQueries()) {
+ if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+ DCHECK(current_probabilities_.at(priority_level) != nullptr);
+ const auto it = current_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ current_probabilities_.at(priority_level)->pickRandomProperty());
+ }
+ // LOG(INFO) << "No queries in priority level: " << priority_level;
+ } else {
+ DCHECK(default_probabilities_.at(priority_level) != nullptr);
+ const auto it = default_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ default_probabilities_.at(priority_level)->pickRandomProperty());
+ }
+ // LOG(INFO) << "No queries in priority level: " << priority_level;
+ }
+ }
+ return kInvalidQueryID;
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -261,6 +369,8 @@ class Learner {
**/
inline bool hasFeedbackFromAllQueriesInPriorityLevel(
const std::size_t priority_level) const {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // return has_feedback_from_all_queries_.at(priority_level);
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
&stats_vector = execution_stats_.at(priority_level);
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
@@ -275,16 +385,19 @@ class Learner {
inline void updateFeedbackFromQueriesInPriorityLevel(
const std::size_t priority_level) {
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
- &stats_vector = execution_stats_.at(priority_level);
+ &stats_vector = execution_stats_[priority_level];
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
DCHECK(stats_vector[i].second != nullptr);
if (!stats_vector[i].second->hasStats()) {
// At least one query has no statistics so far.
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_[priority_level] = false;
return;
}
}
// All the queries have at least one execution statistic.
- has_feedback_from_all_queries_[priority_level] = true;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_[priority_level] = true;
}
/**
@@ -313,31 +426,36 @@ class Learner {
}
/**
- * @param mean_workorder_per_query A vector of pairs in which:
- * 1st element is mean time per work order
- * 2nd element is the query ID.
+ * @param mean_workorder_per_query An unordered_map in which:
+ * 1st element is the query ID.
+ * 2nd element is mean time per work order
*
* @note If any query has mean work order time as 0, we return 0 as the
* denominator.
*
* @return The denominator to be used for probability calculations.
**/
- inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
- &mean_workorder_per_query) const {
+ inline float calculateDenominator(
+ const std::unordered_map<std::size_t, std::size_t>
+ &mean_workorder_per_query) const {
float denominator = 0;
for (const auto &element : mean_workorder_per_query) {
if (element.second != 0) {
denominator += 1/static_cast<float>(element.second);
- } else {
- return 0;
+ /*} else {
+ return 0;*/
}
}
return denominator;
}
inline bool hasFeedbackFromAllPriorityLevels() const {
- for (auto feedback : has_feedback_from_all_queries_) {
- if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+ // for (auto feedback : has_feedback_from_all_queries_) {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ for (auto priority_iter = default_probabilities_.cbegin();
+ priority_iter != default_probabilities_.cend();
+ ++priority_iter) {
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_iter->first)) {
return false;
}
}
@@ -369,9 +487,10 @@ class Learner {
// ProbabilityStrore for probabilities mapped to the priority levels.
std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
// Key = priority level. Value = A boolean that indicates if we have received
// feedback from all the queries in the given priority level.
- std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+ // std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
int highest_priority_level_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 233dd2e..ed52f75 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -41,7 +41,7 @@ class ProbabilityStore {
* @brief Constructor.
**/
ProbabilityStore()
- : common_denominator_(1.0), mt_(std::random_device()()) {}
+ : common_denominator_(1.0) {}
/**
* @brief Get the number of objects in the store.
@@ -221,11 +221,16 @@ class ProbabilityStore {
/**
* @brief Return a randomly chosen property.
*
+ * TODO(harshad) - If it is expensive to create the random device
+ * on every invocation of this function, make it a class variable.
+ * In which case, we won't be able to mark the function as const.
+ *
* @note The random number is uniformly chosen.
**/
- inline const std::size_t pickRandomProperty() {
+ inline const std::size_t pickRandomProperty() const {
std::uniform_real_distribution<float> dist(0.0, 1.0);
- const float chosen_probability = dist(mt_);
+ std::random_device rd;
+ const float chosen_probability = dist(rd);
return getPropertyForProbability(chosen_probability);
}
@@ -260,7 +265,7 @@ class ProbabilityStore {
* or equal to the input cumulative probability.
**/
inline const std::size_t getPropertyForProbability(
- const float key_cumulative_probability) {
+ const float key_cumulative_probability) const {
DCHECK(!cumulative_probabilities_.empty());
// It doesn't matter in which order the objects are arranged in the
// cumulative_probabilities_ vector.
@@ -296,8 +301,6 @@ class ProbabilityStore {
float common_denominator_;
- std::mt19937_64 mt_;
-
DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index e13f3e0..feaa285 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -44,6 +44,7 @@ typedef tmb::client_id client_id;
typedef tmb::message_type_id message_type_id;
const int kInvalidPriorityLevel = -1;
+const int kInvalidQueryID = -1;
using ClientIDMap = ThreadIDBasedMap<client_id,
'C',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/63c12a1a/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 107576f..7d67b1b 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -220,15 +220,16 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
// Randomize the orders.
std::random_device rd;
- std::mt19937 g(rd());
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
std::shuffle(priorities_insertion_order.begin(),
priorities_insertion_order.end(),
- g);
+ g1);
std::shuffle(priorities_removal_order.begin(),
priorities_removal_order.end(),
- g);
+ g2);
Learner learner;
EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
@@ -263,4 +264,256 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
}
+TEST_F(LearnerTest, PickRandomPriorityLevelTest) {
+ std::vector<std::size_t> priorities_insertion_order;
+ std::vector<std::size_t> priorities_removal_order;
+ const std::size_t kNumPrioritiesToTest = 20;
+ for (std::size_t priority_num = 1;
+ priority_num <= kNumPrioritiesToTest;
+ ++priority_num) {
+ // Note: Priority level should be non-zero, hence we begin from 1.
+ priorities_insertion_order.emplace_back(priority_num);
+ priorities_removal_order.emplace_back(priority_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ g1);
+
+ std::shuffle(priorities_removal_order.begin(),
+ priorities_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+
+ std::unique_ptr<QueryHandle> handle;
+ // First insert the queries in the order of priorities as defined by
+ // priorities_insertion_order.
+ for (auto it = priorities_insertion_order.begin();
+ it != priorities_insertion_order.end();
+ ++it) {
+ // Note that the query ID is kept the same as priority level for simplicity.
+ handle.reset(new QueryHandle(*it, *it));
+ learner.addQuery(*handle);
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_insertion_order vector.
+ auto find_priority_level_it = std::find(
+ priorities_insertion_order.begin(), it + 1, picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+ }
+
+ // Repeat the tests a few more times.
+ const std::size_t kNumTests = 200;
+ for (std::size_t test_num = 0; test_num < kNumTests; ++test_num) {
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_insertion_order vector.
+ auto find_priority_level_it = std::find(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+ }
+
+ // Now remove the queries in the order of priorities as defined by
+ // priorities_removal_order.
+ for (auto it = priorities_removal_order.begin();
+ it != priorities_removal_order.end();
+ ++it) {
+ // Recall that the query ID is the same as priority level.
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_removal_order vector.
+ auto find_priority_level_it = std::find(
+ it, priorities_removal_order.end(), picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_removal_order.end());
+ learner.removeQuery(*it);
+ }
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+}
+
+TEST_F(LearnerTest, PickRandomQueryDefaultProbabilitiesTest) {
+ // We use a set of unique query IDs. For each query ID, we assign a priority
+ // level. The set of priority levels is smaller than the set of query IDs, so
+ // that we can have more than one queries for a given priority level.
+
+ // Also, in this test we don't send any completion feedback message to the
+ // learner. Therefore it always refers to the default probabilities set for
+ // the queries.
+ std::vector<std::size_t> query_ids_insertion_order;
+ std::vector<std::size_t> query_ids_removal_order;
+ const std::size_t kNumQueriesToTest = 20;
+ for (std::size_t query_num = 0;
+ query_num < kNumQueriesToTest;
+ ++query_num) {
+ query_ids_insertion_order.emplace_back(query_num);
+ query_ids_removal_order.emplace_back(query_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ g1);
+
+ std::shuffle(query_ids_removal_order.begin(),
+ query_ids_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+ std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+ std::size_t priority_level_index = 0;
+ std::unique_ptr<QueryHandle> handle;
+ // Insert the queries in the order as defined in query_ids_insertion_order.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+ priority_level_index = (priority_level_index + 1) % priority_levels.size();
+ learner.addQuery(*handle);
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(
+ query_ids_insertion_order.begin(), it + 1, picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Repeat the tests a few more times.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Remove the queries in the order as defined in query_ids_removal_order.
+ for (auto it = query_ids_removal_order.begin();
+ it != query_ids_removal_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_removal_order.
+ auto find_query_it = std::find(
+ it, query_ids_removal_order.end(), picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+ learner.removeQuery(*it);
+ }
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
+
+TEST_F(LearnerTest, PickRandomQueryCurrentProbabilitiesTest) {
+ // We use a set of unique query IDs. For each query ID, we assign a priority
+ // level. The set of priority levels is smaller than the set of query IDs, so
+ // that we can have more than one queries for a given priority level.
+
+ // In this test we send completion feedback messages for all the queries
+ // to the learner. Therefore it refers to the current probabilities set for
+ // the queries.
+ std::vector<std::size_t> query_ids_insertion_order;
+ std::vector<std::size_t> query_ids_removal_order;
+ const std::size_t kNumQueriesToTest = 20;
+ for (std::size_t query_num = 0;
+ query_num < kNumQueriesToTest;
+ ++query_num) {
+ query_ids_insertion_order.emplace_back(query_num);
+ query_ids_removal_order.emplace_back(query_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ g1);
+
+ std::shuffle(query_ids_removal_order.begin(),
+ query_ids_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+ std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+ std::size_t priority_level_index = 0;
+ std::unique_ptr<QueryHandle> handle;
+ // Insert the queries in the order as defined in query_ids_insertion_order.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+ priority_level_index = (priority_level_index + 1) % priority_levels.size();
+ learner.addQuery(*handle);
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(
+ query_ids_insertion_order.begin(), it + 1, picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Now send one completion feedback message per query to the learner.
+ const std::size_t kOperatorID = 0;
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ // LOG(INFO) << "Completion message for query : " << *it;
+ learner.addCompletionFeedback(createMockCompletionMessage(*it, kOperatorID));
+ }
+
+ // Repeat the tests a few more times.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Remove the queries in the order as defined in query_ids_removal_order.
+ for (auto it = query_ids_removal_order.begin();
+ it != query_ids_removal_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_removal_order.
+ auto find_query_it = std::find(
+ it, query_ids_removal_order.end(), picked_query_id);
+ // We expect the search to be successful.
+ // LOG(INFO) << "Picked query ID: " << picked_query_id << "\n";
+ EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+ learner.removeQuery(*it);
+ // LOG(INFO) << "Removed query ID: " << *it;
+ }
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
} // namespace quickstep
[10/18] incubator-quickstep git commit: Added test for adding
completion message feedback.
Posted by hb...@apache.org.
Added test for adding completion message feedback.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/45feea30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/45feea30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/45feea30
Branch: refs/heads/scheduler++
Commit: 45feea304fc48e501291a4c85e95fded2e1f04e4
Parents: 3838764
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 12:09:22 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 +
query_execution/Learner.cpp | 4 +-
query_execution/tests/Learner_unittest.cpp | 82 ++++++++++++++++++++++++-
3 files changed, 82 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/45feea30/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 3904185..ef1ce99 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -274,6 +274,7 @@ target_link_libraries(Learner_unittest
gtest
gtest_main
quickstep_queryexecution_Learner
+ quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryoptimizer_QueryHandle)
add_test(Learner_unittest Learner_unittest)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/45feea30/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 0f17e7a..c7a7064 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -51,10 +51,11 @@ void Learner::addCompletionFeedback(
workorder_completion_proto.execution_time_in_microseconds(),
workorder_completion_proto.operator_index());
- // updateProbability();
if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
updateFeedbackFromQueriesInPriorityLevel(priority_level);
}
+ updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
+ updateProbabilitiesOfAllPriorityLevels();
}
void Learner::updateProbabilitiesForQueriesInPriorityLevel(
@@ -67,7 +68,6 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
return;
} else if (execution_stats_[priority_level].size() == 1u) {
DCHECK(current_probabilities_[priority_level] != nullptr);
- DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
// As we want the probability of the lone query in this priority level as
// 1, we set the numerator same as denominator.
const std::size_t numerator =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/45feea30/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index a1a144d..556c984 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -20,11 +20,26 @@
#include "gtest/gtest.h"
#include "query_execution/Learner.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_optimizer/QueryHandle.hpp"
namespace quickstep {
-TEST(LearnerTest, AddAndRemoveQueryTest) {
+class LearnerTest : public ::testing::Test {
+ protected:
+ serialization::NormalWorkOrderCompletionMessage createMockCompletionMessage(
+ const std::size_t query_id, const std::size_t operator_id) {
+ serialization::NormalWorkOrderCompletionMessage mock_proto_message;
+ mock_proto_message.set_operator_index(operator_id);
+ mock_proto_message.set_query_id(query_id);
+ mock_proto_message.set_worker_thread_index(0);
+ mock_proto_message.set_execution_time_in_microseconds(10);
+
+ return mock_proto_message;
+ }
+};
+
+TEST_F(LearnerTest, AddAndRemoveQueryTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle;
const std::size_t kPriorityLevel = 1;
@@ -53,7 +68,7 @@ TEST(LearnerTest, AddAndRemoveQueryTest) {
EXPECT_FALSE(learner.hasActiveQueries());
}
-TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle1, handle2;
const std::size_t kPriorityLevel = 1;
@@ -85,7 +100,7 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
}
-TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle1, handle2;
const std::size_t kPriorityLevel1 = 1;
@@ -123,4 +138,65 @@ TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
}
+TEST_F(LearnerTest, AddCompletionFeedbackSamePriorityLevelTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel = 1;
+ handle1.reset(new QueryHandle(1, kPriorityLevel));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ learner.addQuery(*handle1);
+ serialization::NormalWorkOrderCompletionMessage completion_message =
+ createMockCompletionMessage(handle1->query_id(), 0);
+
+ learner.addCompletionFeedback(completion_message);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+ handle2.reset(new QueryHandle(2, kPriorityLevel));
+ learner.addQuery(*handle2);
+ completion_message = createMockCompletionMessage(handle2->query_id(), 0);
+ learner.addCompletionFeedback(completion_message);
+
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+}
+
+TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel1 = 1;
+ const std::size_t kPriorityLevel2 = 2;
+ handle1.reset(new QueryHandle(1, kPriorityLevel1));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ learner.addQuery(*handle1);
+
+ handle2.reset(new QueryHandle(2, kPriorityLevel2));
+ learner.addQuery(*handle2);
+
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ const std::size_t kNumIterations = 10;
+ std::vector<QueryHandle*> handles;
+ handles.emplace_back(handle1.get());
+ handles.emplace_back(handle2.get());
+ for (std::size_t iter_num = 0; iter_num < kNumIterations; ++iter_num) {
+ for (std::size_t index = 0; index < handles.size(); ++index) {
+ EXPECT_TRUE(learner.hasActiveQueries());
+ serialization::NormalWorkOrderCompletionMessage completion_message =
+ createMockCompletionMessage(handles[index]->query_id(), 0);
+ learner.addCompletionFeedback(completion_message);
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ }
+ }
+}
} // namespace quickstep
[18/18] incubator-quickstep git commit: Add individual query time
printing
Posted by hb...@apache.org.
Add individual query time printing
- Query entry/admission and completion times are made part of the query
handle class.
- Bug fix in PriorityPolicyEnforcer.
- Signed comparison bugfix.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ad8f0d3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ad8f0d3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ad8f0d3c
Branch: refs/heads/scheduler++
Commit: ad8f0d3c229b0bdea3838227f6d9cc24b133e347
Parents: bfa31b0
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 30 15:00:44 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:06 2016 -0500
----------------------------------------------------------------------
cli/QuickstepCli.cpp | 5 +++
query_execution/ExecutionStats.hpp | 2 +-
query_execution/Learner.cpp | 1 +
query_execution/Learner.hpp | 7 +++--
query_execution/PriorityPolicyEnforcer.cpp | 11 ++++---
query_execution/PriorityPolicyEnforcer.hpp | 4 +++
query_optimizer/QueryHandle.hpp | 41 +++++++++++++++++++++++++
7 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8f0d3c/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 3010ccc..4906f7a 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -19,6 +19,7 @@
/* A standalone command-line interface to QuickStep */
+#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdio>
@@ -530,6 +531,10 @@ int main(int argc, char* argv[]) {
for (std::size_t i = 0; i < query_handles.size(); ++i) {
InputParserUtil::PrintAndDropOutputRelation(
query_handles[i], query_processor.get());
+ printf("Time: %s ms\n",
+ quickstep::DoubleToStringWithSignificantDigits(
+ query_handles[i]->getExecutionTimeMillis(), 3)
+ .c_str());
}
query_processor->saveCatalog();
if (quickstep::FLAGS_profile_and_report_workorder_perf) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8f0d3c/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index 5643749..7b1a171 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -142,7 +142,7 @@ class ExecutionStats {
* these are the last N (=max_entries) entries.
**/
explicit OperatorStats(const std::size_t max_entries) : max_entries_(max_entries) {
- DCHECK_GE(max_entries, 0);
+ DCHECK_GE(max_entries, 0u);
}
inline std::pair<std::uint64_t, std::size_t> getStats() const {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8f0d3c/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 9801f60..183d688 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -245,6 +245,7 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
execution_stats_.erase(priority_level);
current_probabilities_.erase(priority_level);
probabilities_of_priority_levels_->removeObject(priority_level);
+ default_probabilities_.erase(priority_level);
// NOTE(harshad) : Not using this cache as it gets confusing.
// has_feedback_from_all_queries_.erase(priority_level);
if (hasActiveQueries()) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8f0d3c/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index ef92db9..51f3967 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -176,7 +176,7 @@ class Learner {
if (hasActiveQueries()) {
const int random_priority_level = pickRandomPriorityLevel();
// Note : The valid priority level values are non-zero.
- DCHECK_GT(random_priority_level, 0);
+ DCHECK_GT(random_priority_level, 0u);
const int result = pickRandomQueryFromPriorityLevel(
static_cast<std::size_t>(random_priority_level));
return result;
@@ -273,10 +273,10 @@ class Learner {
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
- CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
+ CHECK_GT(priority_level, 0u) << "Priority level should be non-zero";
if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
- execution_stats_[priority_level];
+ execution_stats_.emplace(priority_level, std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>());
if (static_cast<int>(priority_level) > highest_priority_level_) {
highest_priority_level_ = priority_level;
}
@@ -380,6 +380,7 @@ class Learner {
const std::size_t priority_level) const {
// NOTE(harshad) : Not using this cache as it gets confusing.
// return has_feedback_from_all_queries_.at(priority_level);
+ DCHECK(isPriorityLevelPresent(priority_level));
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
&stats_vector = execution_stats_.at(priority_level);
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8f0d3c/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 0a15094..93908a9 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -43,7 +43,6 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" the workers.");
bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
- Learner learner;
if (admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
@@ -52,9 +51,11 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
admitted_queries_[query_id].reset(
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
- LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+ LOG(INFO) << "Admitted query with ID: " << query_handle->query_id() << " priority: " << query_handle->query_priority();
priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
+ query_handle->setAdmissionTime();
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
return true;
} else {
LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -63,6 +64,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
} else {
// This query will have to wait.
LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
+ query_id_to_handle_[query_handle->query_id()] = query_handle;
waiting_queries_.push(query_handle);
return false;
}
@@ -224,10 +226,11 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
// No more queries for the given priority level. Remove the entry.
priority_query_ids_.erase(query_priority_unsigned);
}
+ query_id_to_handle_[query_id]->setCompletionTime();
// Remove the query from the learner.
learner_->removeQuery(query_id);
- LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
- // Admit waiting queries, if any.
+ // TODO(harshad) - Admit waiting queries, if any.
+ LOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
}
bool PriorityPolicyEnforcer::admitQueries(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8f0d3c/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index eafb099..1f13a10 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -218,6 +218,10 @@ class PriorityPolicyEnforcer {
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
+ // Key = query ID, value = a pointer to the QueryHandle.
+ // Note - This map has entries for active and waiting queries only.
+ std::unordered_map<std::size_t, QueryHandle*> query_id_to_handle_;
+
// Key = Query ID.
// Value = A tuple indicating a record of executing a work order.
// Within a tuple ...
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8f0d3c/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 5f3649a..04f672e 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -17,6 +17,7 @@
#ifndef QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
#define QUICKSTEP_QUERY_OPTIMIZER_QUERY_HANDLE_HPP_
+#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -119,6 +120,39 @@ class QueryHandle {
query_result_relation_ = relation;
}
+ void setEntryTime() {
+ entry_time_ = std::chrono::steady_clock::now();
+ }
+
+ void setAdmissionTime() {
+ admission_time_ = std::chrono::steady_clock::now();
+ }
+
+ void setCompletionTime() {
+ completion_time_ = std::chrono::steady_clock::now();
+ }
+
+ const std::chrono::steady_clock::time_point& getEntryTime() const {
+ return entry_time_;
+ }
+
+ const std::chrono::steady_clock::time_point& getAdmissionTime() const {
+ return admission_time_;
+ }
+
+ const std::chrono::steady_clock::time_point& getCompletionTime() const {
+ return completion_time_;
+ }
+
+ const double getExecutionTimeMillis() const {
+ return std::chrono::duration<double, std::milli>(completion_time_ - admission_time_)
+ .count();
+ }
+
+ const double getWaitingTimeMillis() const {
+ return std::chrono::duration<double, std::milli>(admission_time_ - entry_time_).count();
+ }
+
private:
const std::size_t query_id_;
const std::uint64_t query_priority_;
@@ -134,6 +168,13 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;
+ // Time when query entered the system.
+ std::chrono::steady_clock::time_point entry_time_;
+ // Time when query was admitted to the system.
+ std::chrono::steady_clock::time_point admission_time_;
+ // Time when query finished its execution.
+ std::chrono::steady_clock::time_point completion_time_;
+
DISALLOW_COPY_AND_ASSIGN(QueryHandle);
};
[15/18] incubator-quickstep git commit: More conditions and logging
in PolicyEnforcer.
Posted by hb...@apache.org.
More conditions and logging in PolicyEnforcer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/10bc4d7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/10bc4d7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/10bc4d7e
Branch: refs/heads/scheduler++
Commit: 10bc4d7ed9a1bd6f25cd0e7b5e81e5082034fe0d
Parents: 17e6cdd
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 14:35:53 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:06 2016 -0500
----------------------------------------------------------------------
query_execution/PriorityPolicyEnforcer.cpp | 24 ++++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/10bc4d7e/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 6467367..f9a741d 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -174,18 +174,26 @@ void PriorityPolicyEnforcer::getWorkerMessages(
// While there are more priority levels to be checked ..
while (checked_priority_levels.size() < priority_query_ids_.size()) {
const int chosen_priority_level = learner_->pickRandomPriorityLevel();
- DCHECK(chosen_priority_level != kInvalidPriorityLevel);
- WorkerMessage *next_worker_message =
- getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
- &finished_queries_ids);
- if (next_worker_message != nullptr) {
- worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ if (chosen_priority_level == kInvalidPriorityLevel) {
+ LOG(INFO) << "No valid priority level chosen";
+ break;
+ } else if (checked_priority_levels.find(static_cast<std::size_t>(
+ chosen_priority_level)) != checked_priority_levels.end()) {
+ DLOG(INFO) << "The chosen priority level " << chosen_priority_level << " was checked already";
+ continue;
} else {
- checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
+ WorkerMessage *next_worker_message =
+ getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
+ &finished_queries_ids);
+ if (next_worker_message != nullptr) {
+ worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ } else {
+ checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
+ }
}
}
} else {
- LOG(INFO) << "No active queries in the learner at this point.";
+ DLOG(INFO) << "No active queries in the learner at this point.";
return;
}
for (const std::size_t finished_qid : finished_queries_ids) {
[09/18] incubator-quickstep git commit: Select query from learner in
PolicyEnforcer.
Posted by hb...@apache.org.
Select query from learner in PolicyEnforcer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/17e6cdd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/17e6cdd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/17e6cdd8
Branch: refs/heads/scheduler++
Commit: 17e6cdd87c3fe1951fd8fb27963542812da9f5c8
Parents: 862fd21
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 12:25:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.cpp | 5 +-
query_execution/Learner.hpp | 31 +++++----
query_execution/PriorityPolicyEnforcer.cpp | 86 +++++++++++++++++++++----
query_execution/PriorityPolicyEnforcer.hpp | 19 ++++++
4 files changed, 112 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17e6cdd8/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 11c3735..bb24baa 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -49,7 +49,7 @@ void Learner::addCompletionFeedback(
&workorder_completion_proto) {
const std::size_t query_id = workorder_completion_proto.query_id();
DCHECK(isQueryPresent(query_id));
- const std::size_t priority_level = getQueryPriority(query_id);
+ const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
ExecutionStats *execution_stats = getExecutionStats(query_id);
DCHECK(execution_stats != nullptr);
execution_stats->addEntry(
@@ -106,12 +106,10 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
query_id,
1 / static_cast<float>(mean_workorders_per_query[query_id]),
denominator);
- // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
} else {
// At least one of the queries has predicted time for next work order as 0.
// In such a case, we don't update the probabilities and continue to use
// the older probabilities.
- // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
return;
}
}
@@ -248,7 +246,6 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
probabilities_of_priority_levels_->removeObject(priority_level);
// NOTE(harshad) : Not using this cache as it gets confusing.
// has_feedback_from_all_queries_.erase(priority_level);
- // LOG(INFO) << "Removed priority level: " << priority_level;
if (hasActiveQueries()) {
if (static_cast<int>(priority_level) == highest_priority_level_) {
// The priority level to be removed is the highest priority level.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17e6cdd8/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 2c6fdef..8634842 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -70,7 +70,7 @@ class Learner {
**/
void removeQuery(const std::size_t query_id) {
DCHECK(isQueryPresent(query_id));
- const std::size_t priority_level = getQueryPriority(query_id);
+ const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
// Find the iterator to the query in execution_stats_.
auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
execution_stats_[priority_level].erase(stats_iter_mutable);
@@ -158,9 +158,6 @@ class Learner {
if (hasActiveQueries()) {
const int result = static_cast<int>(
probabilities_of_priority_levels_->pickRandomProperty());
- /*LOG(INFO) << "Random priority level: " << result << " has "
- << current_probabilities_.find(result)->second->getNumObjects()
- << " queries";*/
return result;
} else {
return kInvalidPriorityLevel;
@@ -182,10 +179,8 @@ class Learner {
DCHECK_GT(random_priority_level, 0);
const int result = pickRandomQueryFromPriorityLevel(
static_cast<std::size_t>(random_priority_level));
- // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
return result;
} else {
- // LOG(INFO) << "No active query right now";
return kInvalidQueryID;
}
}
@@ -209,7 +204,6 @@ class Learner {
return static_cast<int>(
current_probabilities_.at(priority_level)->pickRandomProperty());
}
- // LOG(INFO) << "No queries in priority level: " << priority_level;
} else {
DCHECK(default_probabilities_.at(priority_level) != nullptr);
const auto it = default_probabilities_.find(priority_level);
@@ -217,12 +211,24 @@ class Learner {
return static_cast<int>(
default_probabilities_.at(priority_level)->pickRandomProperty());
}
- // LOG(INFO) << "No queries in priority level: " << priority_level;
}
}
return kInvalidQueryID;
}
+ /**
+ * @brief Given a query ID, if the query exists in the learner, return its
+ * priority, otherwise return kInvalidPriorityLevel.
+ **/
+ inline int getQueryPriority(const std::size_t query_id) const {
+ const auto it = query_id_to_priority_lookup_.find(query_id);
+ if (it != query_id_to_priority_lookup_.end()) {
+ return it->second;
+ } else {
+ return kInvalidPriorityLevel;
+ }
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -324,7 +330,7 @@ class Learner {
if (isQueryPresent(query_id)) {
const auto stats_iter = getExecutionStatsIterMutable(query_id);
DCHECK(stats_iter !=
- std::end(execution_stats_[getQueryPriority(query_id)]));
+ std::end(execution_stats_[getQueryPriorityUnsafe(query_id)]));
return stats_iter->second.get();
}
return nullptr;
@@ -339,7 +345,7 @@ class Learner {
inline std::vector<
std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
getExecutionStatsIterMutable(const std::size_t query_id) {
- const std::size_t priority_level = getQueryPriority(query_id);
+ const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
&stats_vector = execution_stats_[priority_level];
// The following line uses std::find_if to reach to the desired element
@@ -356,8 +362,11 @@ class Learner {
/**
* @brief Get a query's priority level given its ID.
+ *
+ * @note This version assumes that the given query ID exists in data
+ * structures.
**/
- inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+ inline const std::size_t getQueryPriorityUnsafe(const std::size_t query_id) const {
const auto it = query_id_to_priority_lookup_.find(query_id);
DCHECK(it != query_id_to_priority_lookup_.end());
return it->second;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17e6cdd8/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 44ccb0a..6467367 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -53,6 +53,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+ priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
return true;
} else {
@@ -71,6 +72,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
// QueryManager, so that we need to extract message from the
// TaggedMessage only once.
std::size_t query_id;
+ std::size_t operator_id;
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
serialization::NormalWorkOrderCompletionMessage proto;
@@ -79,6 +81,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ operator_id = proto.operator_index();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
learner_->addCompletionFeedback(proto);
@@ -94,6 +97,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
+ operator_id = proto.operator_index();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
break;
@@ -132,6 +136,7 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
const QueryManager::QueryStatusCode return_code =
admitted_queries_[query_id]->processMessage(tagged_message);
+ //NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
if (!waiting_queries_.empty()) {
@@ -140,6 +145,8 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
waiting_queries_.pop();
admitQuery(new_query);
}
+ } else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
+ learner_->removeOperator(query_id, operator_id);
}
}
@@ -161,26 +168,25 @@ void PriorityPolicyEnforcer::getWorkerMessages(
DCHECK_GT(per_query_share, 0u);
std::vector<std::size_t> finished_queries_ids;
- for (const auto &admitted_query_info : admitted_queries_) {
- QueryManager *curr_query_manager = admitted_query_info.second.get();
- DCHECK(curr_query_manager != nullptr);
- std::size_t messages_collected_curr_query = 0;
- while (messages_collected_curr_query < per_query_share) {
+ if (learner_->hasActiveQueries()) {
+ // Key = priority level. Value = Whether we have already checked the
+ std::unordered_map<std::size_t, bool> checked_priority_levels;
+ // While there are more priority levels to be checked ..
+ while (checked_priority_levels.size() < priority_query_ids_.size()) {
+ const int chosen_priority_level = learner_->pickRandomPriorityLevel();
+ DCHECK(chosen_priority_level != kInvalidPriorityLevel);
WorkerMessage *next_worker_message =
- curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+ getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
+ &finished_queries_ids);
if (next_worker_message != nullptr) {
- ++messages_collected_curr_query;
worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
} else {
- // No more work ordes from the current query at this time.
- // Check if the query's execution is over.
- if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
- // If the query has been executed, remove it.
- finished_queries_ids.push_back(admitted_query_info.first);
- }
- break;
+ checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
}
}
+ } else {
+ LOG(INFO) << "No active queries in the learner at this point.";
+ return;
}
for (const std::size_t finished_qid : finished_queries_ids) {
removeQuery(finished_qid);
@@ -194,6 +200,22 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
<< " that hasn't finished its execution";
}
admitted_queries_.erase(query_id);
+ // Remove the query from priority_query_ids_ structure.
+ const int query_priority = learner_->getQueryPriority(query_id);
+ DCHECK(query_priority != kInvalidPriorityLevel);
+ const std::size_t query_priority_unsigned =
+ static_cast<std::size_t>(query_priority);
+ std::vector<std::size_t> *query_ids_for_priority_level =
+ &priority_query_ids_[query_priority_unsigned];
+ query_ids_for_priority_level->erase(
+ std::remove(query_ids_for_priority_level->begin(),
+ query_ids_for_priority_level->end(),
+ query_id));
+ if (query_ids_for_priority_level->empty()) {
+ // No more queries for the given priority level. Remove the entry.
+ priority_query_ids_.erase(query_priority_unsigned);
+ }
+ // Remove the query from the learner.
learner_->removeQuery(query_id);
}
@@ -219,4 +241,40 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
proto.execution_time_in_microseconds());
}
+WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
+ const std::size_t priority_level,
+ std::vector<std::size_t> *finished_queries_ids) {
+ // Key = query ID from the given priority level, value = whether we have
+ // checked this query earlier.
+ std::unordered_map<std::size_t, bool> checked_query_ids;
+ // While there are more queries to be checked ..
+ while (checked_query_ids.size() < priority_query_ids_[priority_level].size()) {
+ const int chosen_query_id = learner_->pickRandomQueryFromPriorityLevel(priority_level);
+ if (chosen_query_id == kInvalidQueryID) {
+ // No query available at this time in this priority level.
+ return nullptr;
+ } else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
+ // We have already seen this query ID, try one more time.
+ LOG(INFO) << "We have already seen this query, continue";
+ continue;
+ } else {
+ // We haven't seen this query earlier. Check if it has any schedulable
+ // WorkOrder.
+ QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(chosen_query_id)].get();
+ DCHECK(chosen_query_manager != nullptr);
+ std::unique_ptr<WorkerMessage> next_worker_message(chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+ if (next_worker_message != nullptr) {
+ return next_worker_message.release();
+ } else {
+ // This query doesn't have any WorkerMessage right now. Mark as checked.
+ checked_query_ids[chosen_query_id] = true;
+ if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+ }
+ }
+ }
+ }
+ return nullptr;
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17e6cdd8/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 94cbe38..281c066 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -183,6 +183,21 @@ class PriorityPolicyEnforcer {
void recordTimeForWorkOrder(
const serialization::NormalWorkOrderCompletionMessage &proto);
+ /**
+ * @brief get a WorkerMessage from the chosen priority level.
+ *
+ * @param priority_level The priority level from which the query will be
+ * chosen.
+ * @param finished_query_ids A vector of query IDs that have finished their
+ * execution.
+ *
+ * @return A WorkerMessage. If no query can be chosen from this priority level,
+ * return NULL.
+ **/
+ WorkerMessage *getNextWorkerMessageFromPriorityLevel(
+ const std::size_t priority_level,
+ std::vector<std::size_t> *finished_queries_ids);
+
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
@@ -193,6 +208,10 @@ class PriorityPolicyEnforcer {
tmb::MessageBus *bus_;
const bool profile_individual_workorders_;
+ // Key = priority level, value = a vector of IDs of the queries belonging to
+ // the key priority level.
+ std::unordered_map<std::size_t, std::vector<std::size_t>> priority_query_ids_;
+
// Key = query ID, value = QueryManager* for the key query.
std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
[13/18] incubator-quickstep git commit: Created a class for storing
probabilities.
Posted by hb...@apache.org.
Created a class for storing probabilities.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0100dee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0100dee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0100dee8
Branch: refs/heads/scheduler++
Commit: 0100dee8d7660ec8f08347f7e75308a3683a2a8e
Parents: 1794204
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 21 11:45:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 13 ++
query_execution/PolicyEnforcer.cpp | 1 +
query_execution/ProbabilityStore.hpp | 223 +++++++++++++++++++
.../tests/ProbabilityStore_unittest.cpp | 75 +++++++
4 files changed, 312 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0100dee8/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index fcd4f48..18ae0da 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -36,6 +36,7 @@ add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionSt
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
${queryexecution_QueryContext_proto_srcs}
@@ -97,6 +98,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
glog
quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryManager
@@ -106,6 +108,9 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
quickstep_relationaloperators_WorkOrder
quickstep_utility_Macros
tmb)
+target_link_libraries(quickstep_queryexecution_ProbabilityStore
+ glog
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_QueryContext
glog
quickstep_catalog_CatalogDatabaseLite
@@ -252,6 +257,14 @@ if (ENABLE_DISTRIBUTED)
add_test(BlockLocator_unittest BlockLocator_unittest)
endif()
+add_executable(ProbabilityStore_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
+target_link_libraries(ProbabilityStore_unittest
+ gtest
+ gtest_main
+ quickstep_queryexecution_ProbabilityStore)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
+
add_executable(QueryManager_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
target_link_libraries(QueryManager_unittest
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0100dee8/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 84aa86a..db7206b 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
#include "query_execution/WorkerDirectory.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0100dee8/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
new file mode 100644
index 0000000..8343d24
--- /dev/null
+++ b/query_execution/ProbabilityStore.hpp
@@ -0,0 +1,223 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <random>
+#include <unordered_map>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief A class that stores the probabilities of objects. We use a field
+ * called "property" to identify each object.
+ **/
+class ProbabilityStore {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ ProbabilityStore()
+ : mt_(std::random_device()()) {}
+
+ /**
+ * @brief Get the number of objects in the store.
+ **/
+ const std::size_t getNumObjects() const {
+ DCHECK_EQ(individual_probabilities_.size(), cumulative_probabilities_.size());
+ return individual_probabilities_.size();
+ }
+
+ /**
+ * @brief Add individual (not cumulative) probability for a given object.
+ *
+ * @note This function leaves the cumulative probabilities in a consistent
+ * state. An alternative lazy implementation should be written if cost
+ * of calculating cumulative probabilities is high.
+ * @note This function may override previously written probability values.
+ *
+ * @param property The property of the given object.
+ * @param individual_probability The individual (not cumulative) probability
+ * of the given object.
+ **/
+ void addProbability(const std::size_t property,
+ const float individual_probability) {
+ individual_probabilities_[property] = individual_probability;
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Add individual (not cumulative) probabilities for given objects.
+ *
+ * @note This function leaves the cumulative probabilities in a consistent
+ * state. An alternative lazy implementation should be written if cost
+ * of calculating cumulative probabilities is high.
+ * @note This function may override previously written probability values.
+ *
+ * @param properties A vector of properties to be added.
+ * @param individual_probabilities The individual (not cumulative)
+ * probabilities of the given objects.
+ **/
+ void addProbabilities(const std::vector<std::size_t> &properties,
+ const std::vector<float> &individual_probabilities) {
+ DCHECK_EQ(properties.size(), individual_probabilities.size());
+ for (std::size_t i = 0; i < properties.size(); ++i) {
+ individual_probabilities_[properties[i]] = individual_probabilities[i];
+ }
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Update the probability of a given object to a new value.
+ *
+ * @param property The property of the object.
+ * @param new_individual_probability The new probability to be set.
+ **/
+ void updateProbability(const std::size_t property,
+ const float new_individual_probability) {
+ auto it = individual_probabilities_.find(property);
+ DCHECK(it != individual_probabilities_.end());
+ it->second = new_individual_probability;
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Remove an object from the store.
+ *
+ * @param property The property of the object to be removed.
+ **/
+ void removeObject(const std::size_t property) {
+ auto it = individual_probabilities_.find(property);
+ DCHECK(it != individual_probabilities_.end());
+ individual_probabilities_.erase(it);
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Get the individual probability (not cumulative) for an object.
+ *
+ * @param property The property of the object.
+ **/
+ const float getIndividualProbability(const std::size_t property) const {
+ const auto it = individual_probabilities_.find(property);
+ DCHECK(it != individual_probabilities_.end());
+ return it->second;
+ }
+
+ /**
+ * @brief Update the cumulative probabilities.
+ *
+ * @note This function should be called upon if there are any updates,
+ * additions or deletions to the individual probabilities.
+ * @note An efficient implementation should be written if there are large
+ * number of objects.
+ **/
+ void updateCumulativeProbabilities() {
+ cumulative_probabilities_.clear();
+ if (individual_probabilities_.empty()) {
+ // No need to modify the cumulative probabilities.
+ return;
+ }
+ float cumulative_probability = 0;
+ for (const auto property_probability_pair : individual_probabilities_) {
+ cumulative_probabilities_.emplace_back(property_probability_pair.first,
+ cumulative_probability);
+ cumulative_probability += property_probability_pair.second;
+ }
+ // Adjust the last cumulative probability manually to 1.0, so that floating
+ // addition related rounding issues are ignored.
+ cumulative_probabilities_.back().updateProbability(1.0);
+ }
+
+ /**
+ * @brief Return a randomly chosen property.
+ *
+ * @note The random number is uniformly chosen.
+ **/
+ inline const std::size_t pickRandomProperty() {
+ std::uniform_real_distribution<float> dist(0.0, 1.0);
+ const float chosen_probability = dist(mt_);
+ return getPropertyForProbability(chosen_probability);
+ }
+
+ private:
+ class ProbabilityInfo {
+ public:
+ ProbabilityInfo(const std::size_t property, const float probability)
+ : property_(property), probability_(probability) {
+ DCHECK_LE(probability, 1.0);
+ }
+
+ ProbabilityInfo(const ProbabilityInfo &other) = default;
+
+ ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
+
+ void updateProbability(const float new_probability) {
+ DCHECK_LE(new_probability, 1.0);
+ probability_ = new_probability;
+ }
+
+ std::size_t property_;
+ float probability_;
+ };
+
+ /**
+ * @brief Get a property for a given cumulative probability.
+ *
+ * @param key_cumulative_probability The input cumulative probability.
+ *
+ * @return The object that has a cumulative probability that is greater than
+ * or equal to the input cumulative probability.
+ **/
+ inline const std::size_t getPropertyForProbability(
+ const float key_cumulative_probability) {
+ DCHECK(!cumulative_probabilities_.empty());
+ // It doesn't matter in which order the objects are arranged in the
+ // cumulative_probabilities_ vector.
+ ProbabilityInfo search_key(0, key_cumulative_probability);
+ const auto it = std::upper_bound(
+ cumulative_probabilities_.begin(),
+ cumulative_probabilities_.end(),
+ search_key,
+ [](const ProbabilityInfo &a, const ProbabilityInfo &b) {
+ return a.probability_ < b.probability_;
+ });
+ DCHECK(it != std::end(cumulative_probabilities_));
+ return it->property_;
+ }
+
+ std::unordered_map<std::size_t, float> individual_probabilities_;
+ std::vector<ProbabilityInfo> cumulative_probabilities_;
+
+ std::mt19937_64 mt_;
+
+ DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0100dee8/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
new file mode 100644
index 0000000..e624557
--- /dev/null
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstddef>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/ProbabilityStore.hpp"
+
+namespace quickstep {
+
+TEST(ProbabilityStoreTest, CountTest) {
+ ProbabilityStore store;
+ EXPECT_EQ(0u, store.getNumObjects());
+ const std::size_t kProperty = 0;
+ store.addProbability(kProperty, 0.5);
+ EXPECT_EQ(1u, store.getNumObjects());
+ store.removeObject(kProperty);
+ EXPECT_EQ(0u, store.getNumObjects());
+
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+ store.addProbabilities(objects, probabilities);
+
+ EXPECT_EQ(objects.size(), store.getNumObjects());
+}
+
+TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
+ ProbabilityStore store;
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+ store.addProbabilities(objects, probabilities);
+
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(probabilities[object_num],
+ store.getIndividualProbability(objects[object_num]));
+ }
+}
+
+TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
+ ProbabilityStore store;
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+ store.addProbabilities(objects, probabilities);
+
+ const std::size_t kNumTrials = 10;
+ while (!objects.empty()) {
+ for (std::size_t trial_num = 0; trial_num < kNumTrials; ++trial_num) {
+ const std::size_t picked_property = store.pickRandomProperty();
+ const auto it = std::find(objects.begin(), objects.end(), picked_property);
+ EXPECT_TRUE(it != objects.end());
+ }
+ const std::size_t property_to_be_removed = objects.back();
+ store.removeObject(property_to_be_removed);
+ objects.pop_back();
+ EXPECT_EQ(objects.size(), store.getNumObjects());
+ }
+}
+
+} // namespace quickstep
[11/18] incubator-quickstep git commit: Added GFLAG to learner.
Posted by hb...@apache.org.
Added GFLAG to learner.
- To control the number of number of work order execution statistics
that are maintained in the Learner, for a given query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/38387644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/38387644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/38387644
Branch: refs/heads/scheduler++
Commit: 38387644ff0fdb6bae08b26dcbebd5aaafc23cb5
Parents: 3e3dda8
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 11:39:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.cpp | 25 +++++++-
query_execution/Learner.hpp | 76 ++++++++++++-------------
query_execution/tests/Learner_unittest.cpp | 17 +++---
3 files changed, 69 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38387644/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 38a773b..0f17e7a 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -34,6 +34,11 @@
namespace quickstep {
+DEFINE_uint64(max_past_entries_learner,
+ 10,
+ "The maximum number of past WorkOrder execution statistics"
+ " entries for a query");
+
void Learner::addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto) {
@@ -90,8 +95,7 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
}
}
-void Learner::updateProbabilitiesOfAllPriorityLevels(
- const std::size_t priority_level) {
+void Learner::updateProbabilitiesOfAllPriorityLevels() {
if (!hasFeedbackFromAllPriorityLevels() ||
has_feedback_from_all_queries_.empty()) {
// Either we don't have enough feedback messages from all the priority
@@ -114,7 +118,7 @@ void Learner::updateProbabilitiesOfAllPriorityLevels(
total_time_curr_level += mean_workorder_entry.second;
}
const std::size_t num_queries_in_priority_level =
- execution_stats_[priority_level].size();
+ execution_stats_[curr_priority_level].size();
DCHECK_GT(num_queries_in_priority_level, 0u);
predicted_time_for_level[curr_priority_level] =
total_time_curr_level / num_queries_in_priority_level;
@@ -195,4 +199,19 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
}
}
+void Learner::initializeQuery(const QueryHandle &query_handle) {
+ const std::size_t priority_level = query_handle.query_priority();
+ const std::size_t query_id = query_handle.query_id();
+ DCHECK(isPriorityLevelPresent(priority_level));
+ query_id_to_priority_lookup_[query_id] = priority_level;
+ // TODO(harshad) - Create a gflag for max_past_entries_learner.
+ execution_stats_[priority_level].emplace_back(
+ query_id,
+ std::unique_ptr<ExecutionStats>(
+ new ExecutionStats(FLAGS_max_past_entries_learner)));
+ // As we are initializing the query, we obviously haven't gotten any
+ // feedback message for this query. Hence mark the following field as false.
+ has_feedback_from_all_queries_[priority_level] = false;
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38387644/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index fb0e4cb..073b693 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -30,15 +30,10 @@
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
-#include "gflags/gflags.h"
#include "glog/logging.h"
namespace quickstep {
-/*DECLARE_int32(max_past_entries_learner,
- 10,
- "The maximum number of past WorkOrder execution statistics"
- " entries for a query");*/
/** \addtogroup QueryExecution
* @{
*/
@@ -94,14 +89,6 @@ class Learner {
}
}
- void updateProbabilitiesForQueriesInPriorityLevel(
- const std::size_t priority_level, const std::size_t query_id);
-
- // TODO(harshad) - Cache internal results from previous invocation of this
- // function and reuse them. There's a lot of redundancy in computations
- // at this point.
- void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
-
inline const bool hasActiveQueries() const {
return !query_id_to_priority_lookup_.empty();
}
@@ -122,6 +109,29 @@ class Learner {
private:
/**
+ * @brief Update the probabilities for queries in the given priority level.
+ *
+ * @note This function is called after the learner receives a completion
+ * feedback message from a given query.
+ *
+ * @param priority_level The priority level.
+ * @param query_id The ID of the query for which a completion feedback message
+ * has been received.
+ *
+ **/
+ void updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id);
+
+ /**
+ * @brief Update the probabilities of all the priority levels.
+ *
+ * TODO(harshad) - Cache internal results from previous invocation of this
+ * function and reuse them. There's a lot of redundancy in computations
+ * at this point.
+ **/
+ void updateProbabilitiesOfAllPriorityLevels();
+
+ /**
* @brief Initialize the default probabilities for the queries.
**/
void initializeDefaultProbabilitiesForAllQueries();
@@ -135,18 +145,14 @@ class Learner {
* @brief Initialize the data structures for a given priority level, if none
* exist. If there are already data structures for the given priority
* level, do nothing.
+ *
+ * @note This function should be followed by a relearn() call, to insert this
+ * priority levels in probabilities_of_priority_levels_.
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
- // Calculate the default probability for the priority level here and use
- // it instead of 0.5 here.
- // TODO(harshad) - Correct this.
- /*const float new_denominator =
- probabilities_of_priority_levels_[priority_level]->getDenominator();
- probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
- priority_level, priority_level, new_denominator);*/
execution_stats_[priority_level];
}
}
@@ -169,6 +175,10 @@ class Learner {
/**
* @brief Check if the Learner has presence of the given priority level.
+ *
+ * @param priority_level The priority level.
+ *
+ * @return True if present, false otherwise.
**/
inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
DCHECK_EQ((current_probabilities_.find(priority_level) ==
@@ -178,7 +188,7 @@ class Learner {
}
/**
- * @brief Check if the query is present.
+ * @brief Check if the query is present in local data structures.
**/
inline bool isQueryPresent(const std::size_t query_id) const {
return query_id_to_priority_lookup_.find(query_id) !=
@@ -190,20 +200,7 @@ class Learner {
*
* @param query_handle The query handle for the new query.
**/
- void initializeQuery(const QueryHandle &query_handle) {
- const std::size_t priority_level = query_handle.query_priority();
- const std::size_t query_id = query_handle.query_id();
- DCHECK(isPriorityLevelPresent(priority_level));
- query_id_to_priority_lookup_[query_id] = priority_level;
- execution_stats_[priority_level].emplace_back(
- query_id,
- std::unique_ptr<ExecutionStats>(
- // new ExecutionStats(FLAGS_max_past_entries_learner)));
- new ExecutionStats(10)));
- // As we are initializing the query, we obviously haven't gotten any
- // feedback message for this query. Hence mark the following field as false.
- has_feedback_from_all_queries_[priority_level] = false;
- }
+ void initializeQuery(const QueryHandle &query_handle);
/**
* @brief Get the execution stats object for the given query.
@@ -222,9 +219,10 @@ class Learner {
}
/**
- * @brief This function works well when the query and priority level exists
- * in the data structures.
+ * @breif Get a mutable iterator to the execution stats for a given query.
*
+ * @note This function works well when the query and priority level exists
+ * in the data structures.
**/
inline std::vector<
std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
@@ -244,6 +242,9 @@ class Learner {
return stats_iter;
}
+ /**
+ * @brief Get a query's priority level given its ID.
+ **/
inline const std::size_t getQueryPriority(const std::size_t query_id) const {
const auto it = query_id_to_priority_lookup_.find(query_id);
DCHECK(it != query_id_to_priority_lookup_.end());
@@ -366,7 +367,6 @@ class Learner {
// Key = priority level. Value = A boolean that indicates if we have received
// feedback from all the queries in the given priority level.
- // TODO(harshad) - Invalidate the cache whenever needed.
std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
DISALLOW_COPY_AND_ASSIGN(Learner);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38387644/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 864bb22..a1a144d 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -27,28 +27,29 @@ namespace quickstep {
TEST(LearnerTest, AddAndRemoveQueryTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle;
- const std::size_t kPriorityLevel1 = 1;
- handle.reset(new QueryHandle(1, kPriorityLevel1));
+ const std::size_t kPriorityLevel = 1;
+ handle.reset(new QueryHandle(1, kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
learner.addQuery(*handle);
EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_TRUE(learner.hasActiveQueries());
+
learner.removeQuery(handle->query_id());
EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
- const std::size_t kPriorityLevel2 = 1;
- handle.reset(new QueryHandle(1, kPriorityLevel2));
+ handle.reset(new QueryHandle(2, kPriorityLevel));
learner.addQuery(*handle);
EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_TRUE(learner.hasActiveQueries());
+
learner.removeQuery(handle->query_id());
EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
}
[05/18] incubator-quickstep git commit: Created unit test for Learner
Posted by hb...@apache.org.
Created unit test for Learner
- API changes for probability store.
- Check if there's a probability entry for the query to be removed.
- Bug fix in remove query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5e7615e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5e7615e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5e7615e9
Branch: refs/heads/scheduler++
Commit: 5e7615e9b31101a6a1d3ce40de4350f4ae779992
Parents: c5f15ca
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 23:03:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 9 ++++
query_execution/Learner.cpp | 6 ++-
query_execution/Learner.hpp | 33 ++++++++++-----
query_execution/ProbabilityStore.hpp | 5 +++
query_execution/tests/Learner_unittest.cpp | 55 +++++++++++++++++++++++++
5 files changed, 96 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e7615e9/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index cb0f815..3904185 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -268,6 +268,15 @@ if (ENABLE_DISTRIBUTED)
add_test(BlockLocator_unittest BlockLocator_unittest)
endif()
+add_executable(Learner_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/Learner_unittest.cpp")
+target_link_libraries(Learner_unittest
+ gtest
+ gtest_main
+ quickstep_queryexecution_Learner
+ quickstep_queryoptimizer_QueryHandle)
+add_test(Learner_unittest Learner_unittest)
+
add_executable(ProbabilityStore_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
target_link_libraries(ProbabilityStore_unittest
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e7615e9/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 72c68f0..5d877b4 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -188,8 +188,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
priority_levels.emplace_back(priority_iter->first);
numerators.emplace_back(priority_iter->first);
}
- probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
- priority_levels, numerators, sum_priority_levels);
+ if (sum_priority_levels > 0) {
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, sum_priority_levels);
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e7615e9/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 64120a7..9d51877 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -35,10 +35,10 @@
namespace quickstep {
-DEFINE_int32(max_past_entries_learner,
+/*DECLARE_int32(max_past_entries_learner,
10,
"The maximum number of past WorkOrder execution statistics"
- " entries for a query");
+ " entries for a query");*/
/** \addtogroup QueryExecution
* @{
*/
@@ -49,6 +49,7 @@ class Learner {
* @brief Constructor.
**/
Learner() {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
}
void addCompletionFeedback(
@@ -67,7 +68,15 @@ class Learner {
const std::size_t priority_level = getQueryPriority(query_id);
auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
execution_stats_[priority_level].erase(stats_iter_mutable);
- current_probabilities_[priority_level]->removeObject(query_id);
+ DCHECK(current_probabilities_.find(priority_level) !=
+ current_probabilities_.end());
+ if (current_probabilities_[priority_level]->hasObject(query_id)) {
+ // We may have cases when a query doesn't produce any feedback message,
+ // therefore we may not have an entry for this query in the
+ // current_probabilities_[priority_level] ProbabilityStore.
+ current_probabilities_[priority_level]->removeObject(query_id);
+ }
+ query_id_to_priority_lookup_.erase(query_id);
checkAndRemovePriorityLevel(priority_level);
relearn();
}
@@ -93,6 +102,10 @@ class Learner {
// at this point.
void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+ inline const std::size_t hasActiveQueries() const {
+ return !query_id_to_priority_lookup_.empty();
+ }
+
private:
/**
* @brief Initialize the default probabilities for the queries.
@@ -111,12 +124,15 @@ class Learner {
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
- if (isPriorityLevelPresent(priority_level)) {
+ if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
// Calculate the default probability for the priority level here and use
// it instead of 0.5 here.
// TODO(harshad) - Correct this.
- probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+ /*const float new_denominator =
+ probabilities_of_priority_levels_[priority_level]->getDenominator();
+ probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
+ priority_level, priority_level, new_denominator);*/
execution_stats_[priority_level];
}
}
@@ -168,7 +184,8 @@ class Learner {
execution_stats_[priority_level].emplace_back(
query_id,
std::unique_ptr<ExecutionStats>(
- new ExecutionStats(FLAGS_max_past_entries_learner)));
+ // new ExecutionStats(FLAGS_max_past_entries_learner)));
+ new ExecutionStats(10)));
// As we are initializing the query, we obviously haven't gotten any
// feedback message for this query. Hence mark the following field as false.
has_feedback_from_all_queries_[priority_level] = false;
@@ -251,10 +268,6 @@ class Learner {
has_feedback_from_all_queries_[priority_level] = true;
}
- inline const std::size_t hasActiveQueries() const {
- return !query_id_to_priority_lookup_.empty();
- }
-
/**
* @brief Get the mean work order execution times for all the queries in
* a given priority level.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e7615e9/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index d31caa6..347df89 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -55,6 +55,11 @@ class ProbabilityStore {
return common_denominator_;
}
+ inline bool hasObject(const std::size_t property) const {
+ auto it = individual_probabilities_.find(property);
+ return (it != individual_probabilities_.end());
+ }
+
/**
* @brief Add individual (not cumulative) probability for a given object.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e7615e9/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
new file mode 100644
index 0000000..cab241a
--- /dev/null
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <memory>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/Learner.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+namespace quickstep {
+
+TEST(LearnerTest, AddQueryTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle;
+ handle.reset(new QueryHandle(1, 1));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ learner.addQuery(*handle);
+ EXPECT_TRUE(learner.hasActiveQueries());
+}
+
+TEST(LearnerTest, RemoveQueryTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle;
+ handle.reset(new QueryHandle(1, 1));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ learner.addQuery(*handle);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_FALSE(learner.hasActiveQueries());
+
+ handle.reset(new QueryHandle(2, 1));
+ learner.addQuery(*handle);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_FALSE(learner.hasActiveQueries());
+}
+
+} // namespace quickstep
[16/18] incubator-quickstep git commit: AdmitRequest message function
accepts multiple queries.
Posted by hb...@apache.org.
AdmitRequest message function accepts multiple queries.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/308e58a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/308e58a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/308e58a2
Branch: refs/heads/scheduler++
Commit: 308e58a20390df7e5a67939184657736b1645e29
Parents: 10bc4d7
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 15:10:51 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:06 2016 -0500
----------------------------------------------------------------------
cli/CommandExecutor.cpp | 4 +++-
cli/QuickstepCli.cpp | 4 +++-
query_execution/AdmitRequestMessage.hpp | 5 +++--
query_execution/QueryExecutionUtil.hpp | 4 ++--
query_optimizer/tests/ExecutionGeneratorTestRunner.cpp | 4 +++-
5 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/308e58a2/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 7083ef5..440a30f 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -216,9 +216,11 @@ inline TypedValue executeQueryForSingleResult(
query_processor->generateQueryHandle(*result.parsed_statement));
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ std::vector<QueryHandle*> query_handles;
+ query_handles.push_back(query_handle.get());
// Use foreman to execute the query plan.
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- main_thread_client_id, foreman_client_id, query_handle.get(), bus);
+ main_thread_client_id, foreman_client_id, &query_handles, bus);
QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id, bus);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/308e58a2/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 3f99130..d7b687e 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -434,11 +434,13 @@ int main(int argc, char* argv[]) {
}
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ std::vector<QueryHandle*> query_handles;
+ query_handles.push_back(query_handle.get());
start = std::chrono::steady_clock::now();
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id,
foreman.getBusClientID(),
- query_handle.get(),
+ &query_handles,
&bus);
try {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/308e58a2/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp
index e33b354..0aefcbf 100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@ -41,8 +41,9 @@ class AdmitRequestMessage {
* @param query_handles The handles of the queries requesting to be admitted
* to the system.
**/
- explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles)
- : query_handles_(query_handles) {}
+ explicit AdmitRequestMessage(std::vector<QueryHandle*> *query_handles)
+ : query_handles_(*DCHECK_NOTNULL(query_handles)) {
+ }
/**
* @brief Constructor for requesting single query admission.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/308e58a2/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 6ea4a29..2b95f1f 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -74,7 +74,7 @@ class QueryExecutionUtil {
*
* @param sender_id The TMB client ID of the sender.
* @param receiver_id The TMB client ID of the receiver.
- * @param query_handle The QueryHandle used in the AdmitRequestMessage.
+ * @param query_handle The QueryHandles used in the AdmitRequestMessage.
* @param bus A pointer to the TMB.
* @param tagged_message A moved from reference to the tagged message.
*
@@ -84,7 +84,7 @@ class QueryExecutionUtil {
static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
const tmb::client_id sender_id,
const tmb::client_id receiver_id,
- QueryHandle *query_handle,
+ std::vector<QueryHandle*> *query_handle,
MessageBus *bus) {
std::unique_ptr<AdmitRequestMessage> request_message(
new AdmitRequestMessage(query_handle));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/308e58a2/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 8c1d306..92baf19 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -96,10 +96,12 @@ void ExecutionGeneratorTestRunner::runTestCase(
logical_generator.generatePlan(*result.parsed_statement));
execution_generator.generatePlan(physical_plan);
+ std::vector<QueryHandle*> query_handles;
+ query_handles.push_back(&query_handle);
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id_,
foreman_->getBusClientID(),
- &query_handle,
+ &query_handles,
&bus_);
QueryExecutionUtil::ReceiveQueryCompletionMessage(
[14/18] incubator-quickstep git commit: API to find the highest
priority level in the learner
Posted by hb...@apache.org.
API to find the highest priority level in the learner
- Unit tests to test the feature.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b65cc2f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b65cc2f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b65cc2f4
Branch: refs/heads/scheduler++
Commit: b65cc2f43c85f272fcc4f4c32490fdf60d93bf43
Parents: 45feea3
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 14:45:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 12 +++--
query_execution/Learner.cpp | 33 +++++++++++++
query_execution/Learner.hpp | 30 +++++++-----
query_execution/ProbabilityStore.hpp | 10 ++--
query_execution/QueryExecutionTypedefs.hpp | 2 +
query_execution/tests/Learner_unittest.cpp | 64 +++++++++++++++++++++++++
6 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b65cc2f4/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ef1ce99..4639617 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -76,7 +76,7 @@ target_link_libraries(quickstep_queryexecution_ExecutionStats
glog
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_Foreman
- ${GFLAGS_LIB_NAME}
+ ${GFLAGS_LIB_NAME}
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanLite
@@ -99,7 +99,8 @@ target_link_libraries(quickstep_queryexecution_Learner
glog
quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_ProbabilityStore
- quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryoptimizer_QueryHandle
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
@@ -274,9 +275,10 @@ target_link_libraries(Learner_unittest
gtest
gtest_main
quickstep_queryexecution_Learner
- quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryoptimizer_QueryHandle)
-add_test(Learner_unittest Learner_unittest)
+add_test(Learner_unittest Learner_unittest)
add_executable(ProbabilityStore_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
@@ -284,7 +286,7 @@ target_link_libraries(ProbabilityStore_unittest
gtest
gtest_main
quickstep_queryexecution_ProbabilityStore)
-add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
add_executable(QueryManager_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b65cc2f4/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index c7a7064..720df33 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,6 +26,7 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
@@ -39,6 +40,11 @@ DEFINE_uint64(max_past_entries_learner,
"The maximum number of past WorkOrder execution statistics"
" entries for a query");
+Learner::Learner()
+ : highest_priority_level_(kInvalidPriorityLevel) {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
+}
+
void Learner::addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto) {
@@ -214,4 +220,31 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
has_feedback_from_all_queries_[priority_level] = false;
}
+void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ execution_stats_.erase(priority_level);
+ current_probabilities_.erase(priority_level);
+ probabilities_of_priority_levels_->removeObject(priority_level);
+ has_feedback_from_all_queries_.erase(priority_level);
+ if (hasActiveQueries()) {
+ if (static_cast<int>(priority_level) == highest_priority_level_) {
+ // The priority level to be removed is the highest priority level.
+ std::size_t new_highest_priority_level = 0;
+ // Find the new highest priority level.
+ for (auto priority_level_it = execution_stats_.cbegin();
+ priority_level_it != execution_stats_.cend();
+ ++priority_level_it) {
+ if (priority_level_it->first > new_highest_priority_level) {
+ new_highest_priority_level = priority_level_it->first;
+ }
+ }
+ highest_priority_level_ = static_cast<int>(new_highest_priority_level);
+ }
+ } else {
+ highest_priority_level_ = kInvalidPriorityLevel;
+ }
+ }
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b65cc2f4/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 073b693..f99b1c6 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -43,9 +43,7 @@ class Learner {
/**
* @brief Constructor.
**/
- Learner() {
- probabilities_of_priority_levels_.reset(new ProbabilityStore());
- }
+ Learner();
void addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
@@ -107,6 +105,16 @@ class Learner {
return query_id_to_priority_lookup_.size();
}
+ /**
+ * @brief Get the highest priority level among the active queries.
+ *
+ * @return The highest priority level. If the system is empty it returns
+ * kInvalidPriorityLevel.
+ **/
+ inline const int getHighestPriorityLevel() const {
+ return highest_priority_level_;
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -151,9 +159,13 @@ class Learner {
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
+ CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
execution_stats_[priority_level];
+ if (static_cast<int>(priority_level) > highest_priority_level_) {
+ highest_priority_level_ = priority_level;
+ }
}
}
@@ -163,15 +175,7 @@ class Learner {
*
* @param priority_level The priority level.
**/
- inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
- DCHECK(isPriorityLevelPresent(priority_level));
- if (execution_stats_[priority_level].empty()) {
- execution_stats_.erase(priority_level);
- current_probabilities_.erase(priority_level);
- probabilities_of_priority_levels_->removeObject(priority_level);
- has_feedback_from_all_queries_.erase(priority_level);
- }
- }
+ void checkAndRemovePriorityLevel(const std::size_t priority_level);
/**
* @brief Check if the Learner has presence of the given priority level.
@@ -369,6 +373,8 @@ class Learner {
// feedback from all the queries in the given priority level.
std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+ int highest_priority_level_;
+
DISALLOW_COPY_AND_ASSIGN(Learner);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b65cc2f4/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 347df89..233dd2e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -212,9 +212,10 @@ class ProbabilityStore {
cumulative_probability);
cumulative_probability += p.second.second;
}
- // Adjust the last cumulative probability manually to 1.0, so that
+ DCHECK(!cumulative_probabilities_.empty());
+ // Adjust the last cumulative probability manually to 1, so that
// floating addition related rounding issues are ignored.
- cumulative_probabilities_.back().updateProbability(1.0);
+ cumulative_probabilities_.back().updateProbability(1);
}
/**
@@ -233,7 +234,9 @@ class ProbabilityStore {
public:
ProbabilityInfo(const std::size_t property, const float probability)
: property_(property), probability_(probability) {
- DCHECK_LE(probability, 1.0);
+ // As GLOG doesn't provide DEBUG only checks for less than equal
+ // comparison for floats, we can't ensure that probability is less than
+ // 1.0.
}
ProbabilityInfo(const ProbabilityInfo &other) = default;
@@ -241,7 +244,6 @@ class ProbabilityStore {
ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
void updateProbability(const float new_probability) {
- DCHECK_LE(new_probability, 1.0);
probability_ = new_probability;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b65cc2f4/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..e13f3e0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -43,6 +43,8 @@ typedef tmb::TaggedMessage TaggedMessage;
typedef tmb::client_id client_id;
typedef tmb::message_type_id message_type_id;
+const int kInvalidPriorityLevel = -1;
+
using ClientIDMap = ThreadIDBasedMap<client_id,
'C',
'l',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b65cc2f4/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 556c984..107576f 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -15,12 +15,18 @@
* limitations under the License.
**/
+#include <algorithm>
+#include <chrono>
+#include <cstddef>
#include <memory>
+#include <random>
+#include <vector>
#include "gtest/gtest.h"
#include "query_execution/Learner.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
namespace quickstep {
@@ -199,4 +205,62 @@ TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
}
}
}
+
+TEST_F(LearnerTest, HighestPriorityLevelTest) {
+ std::vector<std::size_t> priorities_insertion_order;
+ std::vector<std::size_t> priorities_removal_order;
+ const std::size_t kNumPrioritiesToTest = 20;
+ for (std::size_t priority_num = 1;
+ priority_num <= kNumPrioritiesToTest;
+ ++priority_num) {
+ // Note: Priority level should be non-zero, hence we begin from 1.
+ priorities_insertion_order.emplace_back(priority_num);
+ priorities_removal_order.emplace_back(priority_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ std::shuffle(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ g);
+
+ std::shuffle(priorities_removal_order.begin(),
+ priorities_removal_order.end(),
+ g);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+
+ std::unique_ptr<QueryHandle> handle;
+ // First insert the queries in the order of priorities as defined by
+ // priorities_insertion_order.
+ for (auto it = priorities_insertion_order.begin();
+ it != priorities_insertion_order.end();
+ ++it) {
+ // Note that the query ID is kept the same as priority level for simplicity.
+ handle.reset(new QueryHandle(*it, *it));
+ learner.addQuery(*handle);
+ const std::size_t max_priority_so_far =
+ *(std::max_element(priorities_insertion_order.begin(), it + 1));
+ EXPECT_EQ(static_cast<int>(max_priority_so_far),
+ learner.getHighestPriorityLevel());
+ }
+ // Now remove the queries in the order of priorities as defined by
+ // priorities_removal_order.
+ for (auto it = priorities_removal_order.begin();
+ it != priorities_removal_order.end();
+ ++it) {
+ // Recall that the query ID is the same as priority level.
+ const std::size_t max_priority_so_far =
+ *(std::max_element(it, priorities_removal_order.end()));
+ EXPECT_EQ(static_cast<int>(max_priority_so_far),
+ learner.getHighestPriorityLevel());
+ learner.removeQuery(*it);
+ }
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+}
+
} // namespace quickstep
[08/18] incubator-quickstep git commit: Get number of active queries
(total and by priority level)
Posted by hb...@apache.org.
Get number of active queries (total and by priority level)
- Unit tests to check the feature.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7ea9d32a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7ea9d32a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7ea9d32a
Branch: refs/heads/scheduler++
Commit: 7ea9d32a95018c8780c03fd47386f0afbfd7cda8
Parents: 5e7615e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:42:56 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.hpp | 16 +++++++-
query_execution/tests/Learner_unittest.cpp | 54 ++++++++++++++++++++-----
2 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7ea9d32a/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 9d51877..fb0e4cb 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -102,10 +102,24 @@ class Learner {
// at this point.
void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
- inline const std::size_t hasActiveQueries() const {
+ inline const bool hasActiveQueries() const {
return !query_id_to_priority_lookup_.empty();
}
+ inline const std::size_t getNumActiveQueriesInPriorityLevel(
+ const std::size_t priority_level) const {
+ const auto it = execution_stats_.find(priority_level);
+ if (it != execution_stats_.end()) {
+ return it->second.size();
+ } else {
+ return 0;
+ }
+ }
+
+ inline const std::size_t getTotalNumActiveQueries() const {
+ return query_id_to_priority_lookup_.size();
+ }
+
private:
/**
* @brief Initialize the default probabilities for the queries.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7ea9d32a/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index cab241a..74353f0 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -24,32 +24,64 @@
namespace quickstep {
-TEST(LearnerTest, AddQueryTest) {
+TEST(LearnerTest, AddAndRemoveQueryTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle;
- handle.reset(new QueryHandle(1, 1));
+ const std::size_t kPriorityLevel1 = 1;
+ handle.reset(new QueryHandle(1, kPriorityLevel1));
EXPECT_FALSE(learner.hasActiveQueries());
learner.addQuery(*handle);
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_FALSE(learner.hasActiveQueries());
+
+ const std::size_t kPriorityLevel2 = 1;
+ handle.reset(new QueryHandle(1, kPriorityLevel2));
+ learner.addQuery(*handle);
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_FALSE(learner.hasActiveQueries());
}
-TEST(LearnerTest, RemoveQueryTest) {
+TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
Learner learner;
- std::unique_ptr<QueryHandle> handle;
- handle.reset(new QueryHandle(1, 1));
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel = 1;
+ handle1.reset(new QueryHandle(1, kPriorityLevel));
+ handle2.reset(new QueryHandle(2, kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
- learner.addQuery(*handle);
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+ learner.addQuery(*handle1);
EXPECT_TRUE(learner.hasActiveQueries());
- learner.removeQuery(handle->query_id());
- EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+ learner.addQuery(*handle2);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
- handle.reset(new QueryHandle(2, 1));
- learner.addQuery(*handle);
+ learner.removeQuery(handle1->query_id());
EXPECT_TRUE(learner.hasActiveQueries());
- learner.removeQuery(handle->query_id());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+ learner.removeQuery(handle2->query_id());
+
EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
}
} // namespace quickstep
[06/18] incubator-quickstep git commit: Added ExecutionStats class
Posted by hb...@apache.org.
Added ExecutionStats class
- To keep track of query execution statistics for a given query.
- The stats class organizes execution time on a per-operator basis.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/17942041
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/17942041
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/17942041
Branch: refs/heads/scheduler++
Commit: 179420411bcc0191066d74af0a7cf0a19dccc35f
Parents: b258821
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jun 19 10:06:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 6 ++
query_execution/ExecutionStats.hpp | 177 ++++++++++++++++++++++++++++++++
query_execution/PolicyEnforcer.hpp | 2 +-
3 files changed, 184 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17942041/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..fcd4f48 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -32,6 +32,7 @@ if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
+add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -69,6 +70,9 @@ if (ENABLE_DISTRIBUTED)
quickstep_utility_Macros
tmb)
endif()
+target_link_libraries(quickstep_queryexecution_ExecutionStats
+ glog
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_Foreman
${GFLAGS_LIB_NAME}
glog
@@ -91,6 +95,7 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
${GFLAGS_LIB_NAME}
glog
+ quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
@@ -199,6 +204,7 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_Foreman
quickstep_queryexecution_ForemanLite
quickstep_queryexecution_PolicyEnforcer
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17942041/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
new file mode 100644
index 0000000..f28f367
--- /dev/null
+++ b/query_execution/ExecutionStats.hpp
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief Record the execution stats of a query.
+ **/
+class ExecutionStats {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param max_entries The maximum number of entries we remember for each
+ * operator.
+ **/
+ explicit ExecutionStats(const std::size_t max_entries)
+ : max_entries_(max_entries), cached_stats_(std::make_pair(0, 0)) {}
+
+ /**
+ * @brief Get the number of active operators in stats.
+ **/
+ const std::size_t getNumActiveOperators() const {
+ return active_operators_.size();
+ }
+
+ /**
+ * @brief Get the current stats.
+ *
+ * @note This function updates the cache, hence it can't be const. We are lazy
+ * in updating the cache, instead of eagerly updating the cache upon
+ * each update.
+ *
+ * @return A pair - 1st element is total time, 2nd element is total number of
+ * WorkOrders for the whole query.
+ **/
+ std::pair<std::uint64_t, std::uint64_t> getCurrentStats() {
+ if (active_operators_.empty()) {
+ return cached_stats_;
+ } else {
+ std::pair<std::uint64_t, std::uint64_t> result = std::make_pair(0, 0);
+ for (auto it = active_operators_.begin(); it != active_operators_.end(); ++it) {
+ DCHECK(it->second.get() != nullptr);
+ std::pair<std::uint64_t, std::size_t> op_stats = it->second->getStats();
+ result.first += op_stats.first;
+ result.second += op_stats.second;
+ }
+ if (result.first == 0 || result.second == 0) {
+ // If one of the element in the pair is 0, use old result.
+ return cached_stats_;
+ } else if (result.first != 0 && result.second != 0) {
+ cached_stats_ = result;
+ }
+ return result;
+ }
+ }
+
+ /**
+ * @brief Add a new entry to stats.
+ *
+ * @param value The value to be added.
+ * @param operator_index The operator index which the value belongs to.
+ **/
+ void addEntry(std::size_t value, std::size_t operator_index) {
+ if (hasOperator(operator_index)) {
+ // This is not the first entry for the given operator.
+ active_operators_[operator_index]->addEntry(value);
+ } else {
+ // Create the OperatorStats object for this operator.
+ active_operators_[operator_index] =
+ std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
+ }
+ }
+
+ /**
+ * @brief Remove the operator with given index. This should be called only
+ * when the given operator finishes its execution.
+ **/
+ void removeOperator(std::size_t operator_index) {
+ DCHECK(hasOperator(operator_index));
+ active_operators_.erase(operator_index);
+ }
+
+ private:
+ /**
+ * @brief Stats for an operator within the query.
+ *
+ * @note We remember only the last N entries for the operator.
+ **/
+ class OperatorStats {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param max_entries The maximum number of entries we remember. Typically
+ * these are the last N (=max_entries) entries.
+ **/
+ explicit OperatorStats(const std::size_t max_entries) : max_entries_(max_entries) {
+ DCHECK_GE(max_entries, 0);
+ }
+
+ inline std::pair<std::uint64_t, std::size_t> getStats() const {
+ return std::make_pair(std::accumulate(times_.begin(), times_.end(), 0),
+ times_.size());
+ }
+
+ inline void addEntry(std::uint64_t time_value) {
+ if (times_.size() == max_entries_) {
+ times_.pop_front();
+ }
+ times_.push_back(time_value);
+ DCHECK_LE(times_.size(), max_entries_);
+ }
+
+ private:
+ const std::size_t max_entries_;
+ std::deque<std::uint64_t> times_;
+
+ DISALLOW_COPY_AND_ASSIGN(OperatorStats);
+ };
+
+ /**
+ * @brief Check if the operator with given index is present in the stats.
+ **/
+ inline bool hasOperator(const std::size_t operator_index) const {
+ return active_operators_.find(operator_index) != active_operators_.end();
+ }
+
+ const std::size_t max_entries_;
+
+ std::unordered_map<std::size_t, std::unique_ptr<OperatorStats>>
+ active_operators_;
+
+ // Cached stats for the whole query.
+ std::pair<std::uint64_t, std::uint64_t> cached_stats_;
+
+ DISALLOW_COPY_AND_ASSIGN(ExecutionStats);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17942041/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 470ff2a..b7f5735 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -214,4 +214,4 @@ class PolicyEnforcer {
} // namespace quickstep
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
[02/18] incubator-quickstep git commit: Created
PriorityPolicyEnforcer class.
Posted by hb...@apache.org.
Created PriorityPolicyEnforcer class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/862fd216
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/862fd216
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/862fd216
Branch: refs/heads/scheduler++
Commit: 862fd21628acb0fad59387769382d79d2ad9d253
Parents: c1a44e2
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 28 09:49:29 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 19 +-
query_execution/Foreman.cpp | 2 +-
query_execution/Foreman.hpp | 4 +-
query_execution/PolicyEnforcer.cpp | 2 -
query_execution/PriorityPolicyEnforcer.cpp | 222 ++++++++++++++++++++++++
query_execution/PriorityPolicyEnforcer.hpp | 222 ++++++++++++++++++++++++
6 files changed, 465 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 4639617..104f9da 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_PriorityPolicyEnforcer PriorityPolicyEnforcer.cpp PriorityPolicyEnforcer.hpp)
add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
@@ -80,7 +81,7 @@ target_link_libraries(quickstep_queryexecution_Foreman
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanLite
- quickstep_queryexecution_PolicyEnforcer
+ quickstep_queryexecution_PriorityPolicyEnforcer
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_WorkerDirectory
@@ -108,6 +109,21 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
glog
quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_ProbabilityStore
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryManager
+ quickstep_queryexecution_WorkerDirectory
+ quickstep_queryexecution_WorkerMessage
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_relationaloperators_WorkOrder
+ quickstep_utility_Macros
+ tmb)
+target_link_libraries(quickstep_queryexecution_PriorityPolicyEnforcer
+ ${GFLAGS_LIB_NAME}
+ glog
+ quickstep_queryexecution_ExecutionStats
+ quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_Learner
quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryExecutionMessages_proto
@@ -225,6 +241,7 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_ForemanLite
quickstep_queryexecution_Learner
quickstep_queryexecution_PolicyEnforcer
+ quickstep_queryexecution_PriorityPolicyEnforcer
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryexecution_QueryExecutionMessages_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index f9f2e7a..0898ac1 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -87,7 +87,7 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
- policy_enforcer_.reset(new PolicyEnforcer(
+ policy_enforcer_.reset(new PriorityPolicyEnforcer(
foreman_client_id_,
num_numa_nodes,
catalog_database_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 7be57e7..c38a3e6 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -24,7 +24,7 @@
#include <vector>
#include "query_execution/ForemanLite.hpp"
-#include "query_execution/PolicyEnforcer.hpp"
+#include "query_execution/PriorityPolicyEnforcer.hpp"
#include "utility/Macros.hpp"
#include "tmb/id_typedefs.h"
@@ -128,7 +128,7 @@ class Foreman final : public ForemanLite {
CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
- std::unique_ptr<PolicyEnforcer> policy_enforcer_;
+ std::unique_ptr<PriorityPolicyEnforcer> policy_enforcer_;
DISALLOW_COPY_AND_ASSIGN(Foreman);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index ff734ca..db7206b 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,7 +25,6 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/Learner.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
@@ -43,7 +42,6 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" the workers.");
bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
- Learner learner;
if (admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
new file mode 100644
index 0000000..44ccb0a
--- /dev/null
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -0,0 +1,222 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PriorityPolicyEnforcer.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/WorkOrder.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+ " can be allocated in a single round of dispatch of messages to"
+ " the workers.");
+
+bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+ Learner learner;
+ if (admitted_queries_.size() < kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
+ LOG(INFO) << "Admitted query with ID: " << query_handle->query_id();
+ learner_->addQuery(*query_handle);
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
+ // TODO(harshad) : Provide processXMessage() public functions in
+ // QueryManager, so that we need to extract message from the
+ // TaggedMessage only once.
+ std::size_t query_id;
+ switch (tagged_message.message_type()) {
+ case kWorkOrderCompleteMessage: {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // WorkOrder. It can be accessed in this scope.
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ worker_directory_->decrementNumQueuedWorkOrders(
+ proto.worker_thread_index());
+ learner_->addCompletionFeedback(proto);
+ if (profile_individual_workorders_) {
+ recordTimeForWorkOrder(proto);
+ }
+ break;
+ }
+ case kRebuildWorkOrderCompleteMessage: {
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ // Note: This proto message contains the time it took to execute the
+ // rebuild WorkOrder. It can be accessed in this scope.
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ worker_directory_->decrementNumQueuedWorkOrders(
+ proto.worker_thread_index());
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: {
+ serialization::CatalogRelationNewBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kDataPipelineMessage: {
+ serialization::DataPipelineMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrdersAvailableMessage: {
+ serialization::WorkOrdersAvailableMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrderFeedbackMessage: {
+ WorkOrder::FeedbackMessage msg(
+ const_cast<void *>(tagged_message.message()),
+ tagged_message.message_bytes());
+ query_id = msg.header().query_id;
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type found in PriorityPolicyEnforcer";
+ }
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ const QueryManager::QueryStatusCode return_code =
+ admitted_queries_[query_id]->processMessage(tagged_message);
+ if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PriorityPolicyEnforcer::getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(worker_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ std::size_t per_query_share = 0;
+ if (!admitted_queries_.empty()) {
+ per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+ } else {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+ DCHECK_GT(per_query_share, 0u);
+ std::vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManager *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ WorkerMessage *next_worker_message =
+ curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+ if (next_worker_message != nullptr) {
+ ++messages_collected_curr_query;
+ worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (const std::size_t finished_qid : finished_queries_ids) {
+ removeQuery(finished_qid);
+ }
+}
+
+void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+ LOG(WARNING) << "Removing query with ID " << query_id
+ << " that hasn't finished its execution";
+ }
+ admitted_queries_.erase(query_id);
+ learner_->removeQuery(query_id);
+}
+
+bool PriorityPolicyEnforcer::admitQueries(
+ const std::vector<QueryHandle*> &query_handles) {
+ for (QueryHandle *curr_query : query_handles) {
+ if (!admitQuery(curr_query)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void PriorityPolicyEnforcer::recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto) {
+ const std::size_t query_id = proto.query_id();
+ if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
+ workorder_time_recorder_[query_id];
+ }
+ workorder_time_recorder_[query_id].emplace_back(
+ proto.worker_thread_index(),
+ proto.operator_index(),
+ proto.execution_time_in_microseconds());
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
new file mode 100644
index 0000000..94cbe38
--- /dev/null
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -0,0 +1,222 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/Learner.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManager.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerDirectory;
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ * in sharing resources among concurrent queries.
+ **/
+class PriorityPolicyEnforcer {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the Foreman.
+ * @param num_numa_nodes Number of NUMA nodes used by the system.
+ * @param catalog_database The CatalogDatabase used.
+ * @param storage_manager The StorageManager used.
+ * @param bus The TMB.
+ **/
+ PriorityPolicyEnforcer(const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ const bool profile_individual_workorders = false)
+ : foreman_client_id_(foreman_client_id),
+ num_numa_nodes_(num_numa_nodes),
+ catalog_database_(catalog_database),
+ storage_manager_(storage_manager),
+ worker_directory_(worker_directory),
+ bus_(bus),
+ profile_individual_workorders_(profile_individual_workorders) {
+ learner_.reset(new Learner());
+ }
+
+ /**
+ * @brief Destructor.
+ **/
+ ~PriorityPolicyEnforcer() {
+ if (hasQueries()) {
+ LOG(WARNING) << "Destructing PriorityPolicyEnforcer with some unfinished or "
+ "waiting queries";
+ }
+ }
+
+ /**
+ * @brief Admit a query to the system.
+ *
+ * @param query_handle The QueryHandle for the new query.
+ *
+ * @return Whether the query was admitted to the system.
+ **/
+ bool admitQuery(QueryHandle *query_handle);
+
+ /**
+ * @brief Admit multiple queries in the system.
+ *
+ * @note In the current simple implementation, we only allow one active
+ * query in the system. Other queries will have to wait.
+ *
+ * @param query_handles A vector of QueryHandles for the queries to be
+ * admitted.
+ *
+ * @return True if all the queries were admitted, false if at least one query
+ * was not admitted.
+ **/
+ bool admitQueries(const std::vector<QueryHandle*> &query_handles);
+
+ /**
+ * @brief Remove a given query that is under execution.
+ *
+ * @note This function is made public so that it is possible for a query
+ * to be killed. Otherwise, it should only be used privately by the
+ * class.
+ *
+ * TODO(harshad) - Extend this function to support removal of waiting queries.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
+ void removeQuery(const std::size_t query_id);
+
+ /**
+ * @brief Get worker messages to be dispatched. These worker messages come
+ * from the active queries.
+ *
+ * @param worker_messages The worker messages to be dispatched.
+ **/
+ void getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
+
+ /**
+ * @brief Process a message sent to the Foreman, which gets passed on to the
+ * policy enforcer.
+ *
+ * @param message The message.
+ **/
+ void processMessage(const TaggedMessage &tagged_message);
+
+ /**
+ * @brief Check if there are any queries to be executed.
+ *
+ * @return True if there is at least one active or waiting query, false if
+ * the policy enforcer doesn't have any query.
+ **/
+ inline bool hasQueries() const {
+ return !(admitted_queries_.empty() && waiting_queries_.empty());
+ }
+
+ /**
+ * @brief Get the profiling results for individual work order execution for a
+ * given query.
+ *
+ * @note This function should only be called if profiling individual work
+ * orders option is enabled.
+ *
+ * @param query_id The ID of the query for which the profiling results are
+ * requested.
+ *
+ * @return A vector of tuples, each being a single profiling entry.
+ **/
+ inline const std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>&
+ getProfilingResults(const std::size_t query_id) const {
+ DCHECK(profile_individual_workorders_);
+ DCHECK(workorder_time_recorder_.find(query_id) !=
+ workorder_time_recorder_.end());
+ return workorder_time_recorder_.at(query_id);
+ }
+
+ private:
+ static constexpr std::size_t kMaxConcurrentQueries = 2;
+
+ /**
+ * @brief Record the execution time for a finished WorkOrder.
+ *
+ * TODO(harshad) - Extend the functionality to rebuild work orders.
+ *
+ * @param proto The completion message proto sent after the WorkOrder
+ * execution.
+ **/
+ void recordTimeForWorkOrder(
+ const serialization::NormalWorkOrderCompletionMessage &proto);
+
+ const tmb::client_id foreman_client_id_;
+ const std::size_t num_numa_nodes_;
+
+ CatalogDatabaseLite *catalog_database_;
+ StorageManager *storage_manager_;
+ WorkerDirectory *worker_directory_;
+
+ tmb::MessageBus *bus_;
+ const bool profile_individual_workorders_;
+
+ // Key = query ID, value = QueryManager* for the key query.
+ std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+
+ // The queries which haven't been admitted yet.
+ std::queue<QueryHandle*> waiting_queries_;
+
+ // Key = Query ID.
+ // Value = A tuple indicating a record of executing a work order.
+ // Within a tuple ...
+ // 1st element: Logical worker ID.
+ // 2nd element: Operator ID.
+ // 3rd element: Time in microseconds to execute the work order.
+ std::unordered_map<
+ std::size_t,
+ std::vector<std::tuple<std::size_t, std::size_t, std::size_t>>>
+ workorder_time_recorder_;
+
+ std::unique_ptr<Learner> learner_;
+
+ DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_
[17/18] incubator-quickstep git commit: CLI support to admit a
workload.
Posted by hb...@apache.org.
CLI support to admit a workload.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bfa31b0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bfa31b0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bfa31b0a
Branch: refs/heads/scheduler++
Commit: bfa31b0a22f7217e4fe93bff7ea7ccd1a2e6e4b8
Parents: 308e58a
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jun 29 15:59:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:06 2016 -0500
----------------------------------------------------------------------
CMakeLists.txt | 2 -
cli/CMakeLists.txt | 4 +
cli/InputParserUtil.cpp | 30 +++
cli/InputParserUtil.hpp | 6 +
cli/QuickstepCli.cpp | 280 +++++++++++++++---------
query_execution/CMakeLists.txt | 1 +
query_execution/Learner.cpp | 11 +-
query_execution/Learner.hpp | 2 -
query_execution/PriorityPolicyEnforcer.cpp | 38 +++-
query_execution/PriorityPolicyEnforcer.hpp | 6 +-
query_execution/ProbabilityStore.hpp | 2 +-
11 files changed, 262 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 20e1fb9..9d3c413 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -741,10 +741,8 @@ target_link_libraries(quickstep_cli_shell
quickstep_catalog_CatalogRelation
quickstep_cli_CommandExecutor
quickstep_cli_DefaultsConfigurator
- quickstep_cli_DropRelation
quickstep_cli_InputParserUtil
quickstep_cli_LineReader
- quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 44ec223..a85d52c 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -113,6 +113,10 @@ if(QUICKSTEP_HAVE_LIBNUMA)
endif()
target_link_libraries(quickstep_cli_InputParserUtil
glog
+ quickstep_cli_DropRelation
+ quickstep_cli_PrintToScreen
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryProcessor
quickstep_utility_Macros
quickstep_utility_StringUtil)
if(QUICKSTEP_HAVE_LIBNUMA)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/InputParserUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index 352883e..ffc997c 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -24,6 +24,10 @@
#include <vector>
#include "catalog/CatalogConfig.h"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageConfig.h"
#include "utility/StringUtil.hpp"
@@ -36,6 +40,12 @@
using std::string;
namespace quickstep {
+ class CatalogRelation;
+ class CatalogDatabase;
+ class StorageManager;
+}
+
+namespace quickstep {
std::vector<int> InputParserUtil::ParseWorkerAffinities(
const int num_workers,
@@ -87,4 +97,24 @@ std::vector<int> InputParserUtil::GetNUMANodesForCPUs() {
return numa_nodes_of_cpus;
}
+void InputParserUtil::PrintAndDropOutputRelation(
+ QueryHandle *query_handle, QueryProcessor *query_processor) {
+ const CatalogRelation *query_result_relation =
+ query_handle->getQueryResultRelation();
+ if (query_result_relation != nullptr) {
+ PrintToScreen::PrintRelation(*query_result_relation,
+ query_processor->getStorageManager(),
+ stdout);
+ PrintToScreen::PrintOutputSize(
+ *query_result_relation,
+ query_processor->getStorageManager(),
+ stdout);
+
+ DropRelation::Drop(*query_result_relation,
+ query_processor->getDefaultDatabase(),
+ query_processor->getStorageManager());
+ }
+
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/InputParserUtil.hpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.hpp b/cli/InputParserUtil.hpp
index ebb32d2..42e9804 100644
--- a/cli/InputParserUtil.hpp
+++ b/cli/InputParserUtil.hpp
@@ -24,6 +24,9 @@
namespace quickstep {
+class QueryHandle;
+class QueryProcessor;
+
/** \addtogroup CLI
* @{
*/
@@ -60,6 +63,9 @@ class InputParserUtil {
**/
static std::vector<int> GetNUMANodesForCPUs();
+ static void PrintAndDropOutputRelation(QueryHandle *query_handle,
+ QueryProcessor *query_processor);
+
private:
/**
* @brief Private constructor to disable instantiation of the class.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index d7b687e..3010ccc 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -161,6 +161,8 @@ DEFINE_bool(initialize_db, false, "If true, initialize a database.");
DEFINE_bool(print_query, false,
"Print each input query statement. This is useful when running a "
"large number of queries in a batch.");
+DEFINE_bool(accept_workload, false, "If true, accept a workload through CLI, "
+ "otherwise execute one query at a time");
DEFINE_string(profile_file_name, "",
"If nonempty, enable profiling using GOOGLE CPU Profiler, and write "
"its output to the given file name. This flag has no effect if "
@@ -378,129 +380,209 @@ int main(int argc, char* argv[]) {
#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
bool started_profiling = false;
#endif
- for (;;) {
- string *command_string = new string();
- *command_string = line_reader.getNextCommand();
- if (command_string->size() == 0) {
- delete command_string;
- break;
- }
+ if (!quickstep::FLAGS_accept_workload) {
+ for (;;) {
+ string *command_string = new string();
+ *command_string = line_reader.getNextCommand();
+ if (command_string->size() == 0) {
+ delete command_string;
+ break;
+ }
- if (quickstep::FLAGS_print_query) {
- printf("\n%s\n", command_string->c_str());
- }
+ if (quickstep::FLAGS_print_query) {
+ printf("\n%s\n", command_string->c_str());
+ }
- parser_wrapper->feedNextBuffer(command_string);
+ parser_wrapper->feedNextBuffer(command_string);
+
+ bool quitting = false;
+ // A parse error should reset the parser. This is because the thrown quickstep
+ // SqlError does not do the proper reset work of the YYABORT macro.
+ bool reset_parser = false;
+ for (;;) {
+ ParseResult result = parser_wrapper->getNextStatement();
+ if (result.condition == ParseResult::kSuccess) {
+ if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
+ quitting = true;
+ break;
+ }
- bool quitting = false;
- // A parse error should reset the parser. This is because the thrown quickstep
- // SqlError does not do the proper reset work of the YYABORT macro.
- bool reset_parser = false;
- for (;;) {
- ParseResult result = parser_wrapper->getNextStatement();
- if (result.condition == ParseResult::kSuccess) {
- if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
- quitting = true;
- break;
- }
+ if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+ try {
+ quickstep::cli::executeCommand(
+ *result.parsed_statement,
+ *(query_processor->getDefaultDatabase()),
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ &bus,
+ query_processor->getStorageManager(),
+ query_processor.get(),
+ stdout);
+ } catch (const quickstep::SqlError &sql_error) {
+ fprintf(stderr, "%s",
+ sql_error.formatMessage(*command_string).c_str());
+ reset_parser = true;
+ break;
+ }
+ continue;
+ }
- if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+ std::unique_ptr<QueryHandle> query_handle;
try {
- quickstep::cli::executeCommand(
- *result.parsed_statement,
- *(query_processor->getDefaultDatabase()),
- main_thread_client_id,
- foreman.getBusClientID(),
- &bus,
- query_processor->getStorageManager(),
- query_processor.get(),
- stdout);
+ query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
} catch (const quickstep::SqlError &sql_error) {
- fprintf(stderr, "%s",
- sql_error.formatMessage(*command_string).c_str());
+ fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
reset_parser = true;
break;
}
- continue;
- }
- std::unique_ptr<QueryHandle> query_handle;
- try {
- query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
- } catch (const quickstep::SqlError &sql_error) {
- fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ std::vector<QueryHandle*> query_handles;
+ query_handles.push_back(query_handle.get());
+ start = std::chrono::steady_clock::now();
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ &query_handles,
+ &bus);
+
+ try {
+ QueryExecutionUtil::ReceiveQueryCompletionMessage(
+ main_thread_client_id, &bus);
+ end = std::chrono::steady_clock::now();
+
+ InputParserUtil::PrintAndDropOutputRelation(query_handle.get(), query_processor.get());
+ query_processor->saveCatalog();
+ std::chrono::duration<double, std::milli> time_ms = end - start;
+ printf("Time: %s ms\n",
+ quickstep::DoubleToStringWithSignificantDigits(
+ time_ms.count(), 3).c_str());
+ if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+ // TODO(harshad) - Allow user specified file instead of stdout.
+ foreman.printWorkOrderProfilingResults(query_handle->query_id(),
+ stdout);
+ }
+ } catch (const std::exception &e) {
+ fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+ break;
+ }
+ } else {
+ if (result.condition == ParseResult::kError) {
+ fprintf(stderr, "%s", result.error_message.c_str());
+ }
reset_parser = true;
break;
}
+#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
+ // Profile only if profile_file_name flag is set
+ if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
+ started_profiling = true;
+ ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+ }
+#endif
+ }
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- std::vector<QueryHandle*> query_handles;
- query_handles.push_back(query_handle.get());
- start = std::chrono::steady_clock::now();
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- main_thread_client_id,
- foreman.getBusClientID(),
- &query_handles,
- &bus);
-
- try {
- QueryExecutionUtil::ReceiveQueryCompletionMessage(
- main_thread_client_id, &bus);
- end = std::chrono::steady_clock::now();
-
- const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
- if (query_result_relation) {
- PrintToScreen::PrintRelation(*query_result_relation,
- query_processor->getStorageManager(),
- stdout);
- PrintToScreen::PrintOutputSize(
- *query_result_relation,
- query_processor->getStorageManager(),
- stdout);
-
- DropRelation::Drop(*query_result_relation,
- query_processor->getDefaultDatabase(),
- query_processor->getStorageManager());
+ if (quitting) {
+ break;
+ } else if (reset_parser) {
+ parser_wrapper.reset(new SqlParserWrapper());
+ reset_parser = false;
+ }
+ }
+ } else {
+ std::vector<QueryHandle*> query_handles;
+ for (;;) {
+ bool end_of_input = false;
+ // A parse error should reset the parser. This is because the thrown quickstep
+ // SqlError does not do the proper reset work of the YYABORT macro.
+ bool reset_parser = false;
+ string *command_string = new string();
+ *command_string = line_reader.getNextCommand();
+ if (command_string->size() == 0) {
+ delete command_string;
+ end_of_input = true;
+ reset_parser = true;
+ } else {
+ if (quickstep::FLAGS_print_query) {
+ printf("\n%s\n", command_string->c_str());
+ }
+ parser_wrapper->feedNextBuffer(command_string);
+ end_of_input = false;
+ reset_parser = false;
+ }
+ for (;;) {
+ ParseResult result = parser_wrapper->getNextStatement();
+ // Check if the input has ended.
+ if (end_of_input && (result.condition == ParseResult::kEndOfInput ||
+ (result.condition == ParseResult::kSuccess &&
+ result.parsed_statement->getStatementType() ==
+ ParseStatement::kQuit))) {
+ if (!query_handles.empty()) {
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ &query_handles,
+ &bus);
+ try {
+ QueryExecutionUtil::ReceiveQueryCompletionMessage(
+ main_thread_client_id, &bus);
+
+ for (std::size_t i = 0; i < query_handles.size(); ++i) {
+ InputParserUtil::PrintAndDropOutputRelation(
+ query_handles[i], query_processor.get());
+ }
+ query_processor->saveCatalog();
+ if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+ // TODO(harshad) - Allow user specified file instead of stdout.
+ for (std::size_t i = 0; i < query_handles.size(); ++i) {
+ foreman.printWorkOrderProfilingResults(
+ query_handles[i]->query_id(), stdout);
+ }
+ }
+ } catch (const std::exception &e) {
+ fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+ break;
+ }
}
-
- query_processor->saveCatalog();
- std::chrono::duration<double, std::milli> time_ms = end - start;
- printf("Time: %s ms\n",
- quickstep::DoubleToStringWithSignificantDigits(
- time_ms.count(), 3).c_str());
- if (quickstep::FLAGS_profile_and_report_workorder_perf) {
- // TODO(harshad) - Allow user specified file instead of stdout.
- foreman.printWorkOrderProfilingResults(query_handle->query_id(),
- stdout);
+ reset_parser = true;
+ break;
+ } else if (result.condition == ParseResult::kSuccess) {
+ if (result.parsed_statement->getStatementType() == ParseStatement::kSelect) {
+ std::unique_ptr<QueryHandle> query_handle;
+ try {
+ query_handle.reset(query_processor->generateQueryHandle(
+ *result.parsed_statement));
+ } catch (const quickstep::SqlError &sql_error) {
+ fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+ reset_parser = true;
+ break;
+ }
+
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ query_handles.push_back(query_handle.release());
+ } else {
+ LOG(INFO) << "Only select queries are accepted in the workload";
}
- } catch (const std::exception &e) {
- fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
break;
- }
- } else {
- if (result.condition == ParseResult::kError) {
+ } else if (result.condition == ParseResult::kError) {
fprintf(stderr, "%s", result.error_message.c_str());
+ reset_parser = true;
+ break;
+ } else {
+ LOG(FATAL) << "Unhandled case";
+ break;
}
- reset_parser = true;
- break;
}
-#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
- // Profile only if profile_file_name flag is set
- if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
- started_profiling = true;
- ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+ if (end_of_input) {
+ break;
+ } else if (reset_parser) {
+ parser_wrapper.reset(new SqlParserWrapper());
+ reset_parser = false;
}
-#endif
- }
-
- if (quitting) {
- break;
- } else if (reset_parser) {
- parser_wrapper.reset(new SqlParserWrapper());
- reset_parser = false;
}
}
+
#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
if (started_profiling) {
ProfilerStop();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 104f9da..13b74e3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -242,6 +242,7 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_Learner
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_PriorityPolicyEnforcer
+ quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryexecution_QueryExecutionMessages_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index bb24baa..9801f60 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -101,11 +101,12 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
// queries.
DCHECK(mean_workorders_per_query.find(query_id) !=
mean_workorders_per_query.end());
- DCHECK_NE(mean_workorders_per_query[query_id], 0);
- current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
- query_id,
- 1 / static_cast<float>(mean_workorders_per_query[query_id]),
- denominator);
+ if (mean_workorders_per_query[query_id] != 0) {
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id,
+ 1 / static_cast<float>(mean_workorders_per_query[query_id]),
+ denominator);
+ }
} else {
// At least one of the queries has predicted time for next work order as 0.
// In such a case, we don't update the probabilities and continue to use
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 8634842..ef92db9 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -451,8 +451,6 @@ class Learner {
for (const auto &element : mean_workorder_per_query) {
if (element.second != 0) {
denominator += 1/static_cast<float>(element.second);
- /*} else {
- return 0;*/
}
}
return denominator;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index f9a741d..0a15094 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -62,6 +62,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
}
} else {
// This query will have to wait.
+ LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
waiting_queries_.push(query_handle);
return false;
}
@@ -166,7 +167,7 @@ void PriorityPolicyEnforcer::getWorkerMessages(
return;
}
DCHECK_GT(per_query_share, 0u);
- std::vector<std::size_t> finished_queries_ids;
+ std::unordered_map<std::size_t, bool> finished_queries_ids;
if (learner_->hasActiveQueries()) {
// Key = priority level. Value = Whether we have already checked the
@@ -196,8 +197,8 @@ void PriorityPolicyEnforcer::getWorkerMessages(
DLOG(INFO) << "No active queries in the learner at this point.";
return;
}
- for (const std::size_t finished_qid : finished_queries_ids) {
- removeQuery(finished_qid);
+ for (auto finished_qid_pair : finished_queries_ids) {
+ removeQuery(finished_qid_pair.first);
}
}
@@ -225,6 +226,8 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
}
// Remove the query from the learner.
learner_->removeQuery(query_id);
+ LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
+ // Admit waiting queries, if any.
}
bool PriorityPolicyEnforcer::admitQueries(
@@ -251,7 +254,7 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
const std::size_t priority_level,
- std::vector<std::size_t> *finished_queries_ids) {
+ std::unordered_map<std::size_t, bool> *finished_queries_ids) {
// Key = query ID from the given priority level, value = whether we have
// checked this query earlier.
std::unordered_map<std::size_t, bool> checked_query_ids;
@@ -262,9 +265,28 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
// No query available at this time in this priority level.
return nullptr;
} else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
- // We have already seen this query ID, try one more time.
- LOG(INFO) << "We have already seen this query, continue";
- continue;
+ // Find a query from the same priority level, but not present in the
+ // checked_query_ids map.
+ for (const std::size_t qid : priority_query_ids_[priority_level]) {
+ if (checked_query_ids.find(qid) == checked_query_ids.end() &&
+ finished_queries_ids->find(qid) == finished_queries_ids->end()) {
+ // Query not seen already.
+ QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(qid)].get();
+ DCHECK(chosen_query_manager != nullptr);
+ std::unique_ptr<WorkerMessage> next_worker_message(
+ chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+ if (next_worker_message != nullptr) {
+ // LOG(INFO) << "Selecting a work order from query " << qid << " instead";
+ return next_worker_message.release();
+ } else {
+ // This query doesn't have any WorkerMessage right now. Mark as checked.
+ checked_query_ids[qid] = true;
+ if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ (*finished_queries_ids)[static_cast<std::size_t>(qid)] = true;
+ }
+ }
+ }
+ }
} else {
// We haven't seen this query earlier. Check if it has any schedulable
// WorkOrder.
@@ -277,7 +299,7 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
// This query doesn't have any WorkerMessage right now. Mark as checked.
checked_query_ids[chosen_query_id] = true;
if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
- finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+ (*finished_queries_ids)[static_cast<std::size_t>(chosen_query_id)] = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 281c066..eafb099 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -170,7 +170,7 @@ class PriorityPolicyEnforcer {
}
private:
- static constexpr std::size_t kMaxConcurrentQueries = 2;
+ static constexpr std::size_t kMaxConcurrentQueries = 100;
/**
* @brief Record the execution time for a finished WorkOrder.
@@ -188,7 +188,7 @@ class PriorityPolicyEnforcer {
*
* @param priority_level The priority level from which the query will be
* chosen.
- * @param finished_query_ids A vector of query IDs that have finished their
+ * @param finished_query_ids A map of query IDs that have finished their
* execution.
*
* @return A WorkerMessage. If no query can be chosen from this priority level,
@@ -196,7 +196,7 @@ class PriorityPolicyEnforcer {
**/
WorkerMessage *getNextWorkerMessageFromPriorityLevel(
const std::size_t priority_level,
- std::vector<std::size_t> *finished_queries_ids);
+ std::unordered_map<std::size_t, bool> *finished_queries_ids);
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 7278e2b..4ae085e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -286,7 +286,7 @@ class ProbabilityStore {
for (auto it = individual_probabilities_.begin();
it != individual_probabilities_.end();
++it) {
- DCHECK_LE(it->second.first, common_denominator_);
+ // DCHECK_LE(it->second.first, common_denominator_);
it->second.second = it->second.first / common_denominator_;
}
updateCumulativeProbabilities();
[04/18] incubator-quickstep git commit: Created Learner class.
Posted by hb...@apache.org.
Created Learner class.
- Learner keeps track of statistics of concurrent queries
- It maintains the probabilities for individual queries as well as the
priority levels in the system.
- Changes in ProbabilityStore class including addition of numerator,
denominator and more unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c5f15cae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c5f15cae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c5f15cae
Branch: refs/heads/scheduler++
Commit: c5f15caef676a145441aef78c1c897857ae6b3b1
Parents: 0100dee
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 15:54:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:05 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 11 +
query_execution/ExecutionStats.hpp | 18 +
query_execution/Learner.cpp | 195 ++++++++++
query_execution/Learner.hpp | 352 +++++++++++++++++++
query_execution/PolicyEnforcer.cpp | 2 +
query_execution/ProbabilityStore.hpp | 148 ++++++--
.../tests/ProbabilityStore_unittest.cpp | 45 ++-
7 files changed, 728 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5f15cae/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 18ae0da..cb0f815 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,7 @@ add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitR
add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -93,11 +94,20 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
quickstep_threading_Thread
quickstep_utility_Macros
tmb)
+target_link_libraries(quickstep_queryexecution_Learner
+ ${GFLAGS_LIB_NAME}
+ glog
+ quickstep_queryexecution_ExecutionStats
+ quickstep_queryexecution_ProbabilityStore
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
${GFLAGS_LIB_NAME}
glog
quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_Learner
quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
@@ -212,6 +222,7 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_Foreman
quickstep_queryexecution_ForemanLite
+ quickstep_queryexecution_Learner
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5f15cae/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index f28f367..769c7a4 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,6 +58,20 @@ class ExecutionStats {
}
/**
+ * @brief Check if there are any stats present.
+ **/
+ inline bool hasStats() const {
+ for (auto it = active_operators_.begin();
+ it != active_operators_.end();
+ ++it) {
+ if (!it->second->hasStatsForOperator()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* @brief Get the current stats.
*
* @note This function updates the cache, hence it can't be const. We are lazy
@@ -145,6 +159,10 @@ class ExecutionStats {
DCHECK_LE(times_.size(), max_entries_);
}
+ inline bool hasStatsForOperator() const {
+ return !times_.empty();
+ }
+
private:
const std::size_t max_entries_;
std::deque<std::uint64_t> times_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5f15cae/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
new file mode 100644
index 0000000..72c68f0
--- /dev/null
+++ b/query_execution/Learner.cpp
@@ -0,0 +1,195 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/Learner.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void Learner::addCompletionFeedback(
+ const serialization::NormalWorkOrderCompletionMessage
+ &workorder_completion_proto) {
+ const std::size_t query_id = workorder_completion_proto.query_id();
+ DCHECK(isQueryPresent(query_id));
+ const std::size_t priority_level = getQueryPriority(query_id);
+ ExecutionStats *execution_stats = getExecutionStats(query_id);
+ DCHECK(execution_stats != nullptr);
+ execution_stats->addEntry(
+ workorder_completion_proto.execution_time_in_microseconds(),
+ workorder_completion_proto.operator_index());
+
+ // updateProbability();
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+ updateFeedbackFromQueriesInPriorityLevel(priority_level);
+ }
+}
+
+void Learner::updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ LOG(INFO) << "Updating probabilities for query ID: " << query_id
+ << " and priority level: " << priority_level
+ << " that has no queries";
+ return;
+ } else if (execution_stats_[priority_level].size() == 1u) {
+ DCHECK(current_probabilities_[priority_level] != nullptr);
+ DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
+ // As we want the probability of the lone query in this priority level as
+ // 1, we set the numerator same as denominator.
+ const std::size_t numerator =
+ current_probabilities_[priority_level]->getDenominator();
+ current_probabilities_[priority_level]->addOrUpdateObject(query_id,
+ numerator);
+ return;
+ }
+ // Else, there are more than one queries for the given priority level.
+ std::unordered_map<std::size_t, std::size_t>
+ mean_workorders_per_query =
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(priority_level);
+ const float denominator = calculateDenominator(mean_workorders_per_query);
+ if (denominator != 0) {
+ // Update the numerator for the given query and denominator for all the
+ // queries.
+ DCHECK(mean_workorders_per_query.find(query_id) !=
+ mean_workorders_per_query.end());
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id, mean_workorders_per_query[query_id], denominator);
+ } else {
+ // At least one of the queries has predicted time for next work order as 0.
+ // In such a case, we don't update the probabilities and continue to use
+ // the older probabilities.
+ }
+}
+
+void Learner::updateProbabilitiesOfAllPriorityLevels(
+ const std::size_t priority_level) {
+ if (!hasFeedbackFromAllPriorityLevels() ||
+ has_feedback_from_all_queries_.empty()) {
+ // Either we don't have enough feedback messages from all the priority
+ // levels OR there are no active queries in the system.
+ return;
+ }
+ // Compute the predicted work order execution times for all the level.
+ std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
+ std::size_t sum_active_priorities = 0;
+ for (auto priority_iter : has_feedback_from_all_queries_) {
+ std::size_t total_time_curr_level = 0;
+ const std::size_t curr_priority_level = priority_iter.first;
+ sum_active_priorities += curr_priority_level;
+ // For each query, find its predicted work order execution time.
+ const std::unordered_map<std::size_t, std::size_t>
+ mean_workorders_all_queries_curr_level =
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(
+ curr_priority_level);
+ for (auto mean_workorder_entry : mean_workorders_all_queries_curr_level) {
+ total_time_curr_level += mean_workorder_entry.second;
+ }
+ const std::size_t num_queries_in_priority_level =
+ execution_stats_[priority_level].size();
+ DCHECK_GT(num_queries_in_priority_level, 0u);
+ predicted_time_for_level[curr_priority_level] =
+ total_time_curr_level / num_queries_in_priority_level;
+ }
+ DCHECK_GT(sum_active_priorities, 0u);
+ // Now compute the allowable number of work orders for each priority level
+ // that can be executed given a unit total time.
+ // Key = priority level, value = the # of WO mentioned above.
+ std::unordered_map<std::size_t, float> num_workorders_for_level;
+ float total_num_workorders = 0;
+ for (auto predicted_time_iter : predicted_time_for_level) {
+ const std::size_t curr_priority_level = predicted_time_iter.first;
+ const std::size_t num_workorders_for_curr_level =
+ (predicted_time_iter.second == 0)
+ ? 0
+ : static_cast<float>(curr_priority_level) /
+ sum_active_priorities /
+ static_cast<float>(predicted_time_iter.second);
+ num_workorders_for_level[curr_priority_level] = num_workorders_for_curr_level;
+ total_num_workorders += num_workorders_for_curr_level;
+ }
+ if (total_num_workorders == 0) {
+ // No priority level can be selected at this point.
+ return;
+ }
+ // Finally, compute the probabilities.
+ std::vector<std::size_t> priority_levels;
+ std::vector<float> numerators;
+ for (auto num_workorders_iter : num_workorders_for_level) {
+ priority_levels.emplace_back(num_workorders_iter.first);
+ numerators.emplace_back(num_workorders_iter.second);
+ }
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, total_num_workorders);
+}
+
+void Learner::initializeDefaultProbabilitiesForAllQueries() {
+ for (auto queries_same_priority_level_iter = execution_stats_.begin();
+ queries_same_priority_level_iter != execution_stats_.end();
+ ++queries_same_priority_level_iter) {
+ std::vector<std::size_t> query_ids;
+ const auto &queries_vector = queries_same_priority_level_iter->second;
+ DCHECK(!queries_vector.empty());
+ for (auto query_iter = queries_vector.cbegin();
+ query_iter != queries_vector.cend();
+ ++query_iter) {
+ query_ids.emplace_back(query_iter->first);
+ }
+ // Numerator for each query is 1.0
+ // The common denominator is number of queries with the given priority level.
+ std::vector<float> numerators(query_ids.size(), 1.0);
+ // Reset the probability store for this level.
+ const std::size_t curr_priority_level =
+ queries_same_priority_level_iter->first;
+ default_probabilities_[curr_priority_level].reset(new ProbabilityStore());
+ default_probabilities_[curr_priority_level]
+ ->addOrUpdateObjectsNewDenominator(
+ query_ids, numerators, query_ids.size());
+ }
+}
+
+void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
+ std::vector<std::size_t> priority_levels;
+ std::vector<float> numerators;
+ float sum_priority_levels = 0;
+ for (auto priority_iter = execution_stats_.cbegin();
+ priority_iter != execution_stats_.cend();
+ ++priority_iter) {
+ sum_priority_levels += priority_iter->second.size();
+ priority_levels.emplace_back(priority_iter->first);
+ numerators.emplace_back(priority_iter->first);
+ }
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, sum_priority_levels);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5f15cae/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
new file mode 100644
index 0000000..64120a7
--- /dev/null
+++ b/query_execution/Learner.hpp
@@ -0,0 +1,352 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_int32(max_past_entries_learner,
+ 10,
+ "The maximum number of past WorkOrder execution statistics"
+ " entries for a query");
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+class Learner {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Learner() {
+ }
+
+ void addCompletionFeedback(
+ const serialization::NormalWorkOrderCompletionMessage
+ &workorder_completion_proto);
+
+ void addQuery(const QueryHandle &query_handle) {
+ initializePriorityLevelIfNotPresent(query_handle.query_priority());
+ initializeQuery(query_handle);
+ relearn();
+ }
+
+ void removeQuery(const std::size_t query_id) {
+ // Find the iterator to the query in execution_stats_.
+ DCHECK(isQueryPresent(query_id));
+ const std::size_t priority_level = getQueryPriority(query_id);
+ auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
+ execution_stats_[priority_level].erase(stats_iter_mutable);
+ current_probabilities_[priority_level]->removeObject(query_id);
+ checkAndRemovePriorityLevel(priority_level);
+ relearn();
+ }
+
+ void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+ ExecutionStats *stats = getExecutionStats(query_id);
+ DCHECK(stats != nullptr);
+ stats->removeOperator(operator_id);
+ }
+
+ void relearn() {
+ if (hasActiveQueries()) {
+ initializeDefaultProbabilitiesForAllQueries();
+ initializeDefaultProbabilitiesForPriorityLevels();
+ }
+ }
+
+ void updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id);
+
+ // TODO(harshad) - Cache internal results from previous invocation of this
+ // function and reuse them. There's a lot of redundancy in computations
+ // at this point.
+ void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+
+ private:
+ /**
+ * @brief Initialize the default probabilities for the queries.
+ **/
+ void initializeDefaultProbabilitiesForAllQueries();
+
+ /**
+ * @brief Initialize the default probabilities for the priority levels.
+ **/
+ void initializeDefaultProbabilitiesForPriorityLevels();
+
+ /**
+ * @brief Initialize the data structures for a given priority level, if none
+ * exist. If there are already data structures for the given priority
+ * level, do nothing.
+ **/
+ inline void initializePriorityLevelIfNotPresent(
+ const std::size_t priority_level) {
+ if (isPriorityLevelPresent(priority_level)) {
+ current_probabilities_[priority_level].reset(new ProbabilityStore());
+ // Calculate the default probability for the priority level here and use
+ // it instead of 0.5 here.
+ // TODO(harshad) - Correct this.
+ probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+ execution_stats_[priority_level];
+ }
+ }
+
+ /**
+ * @brief First check if the priority level needs to be removed from the local
+ * data structures and remove if needed.
+ *
+ * @param priority_level The priority level.
+ **/
+ inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ execution_stats_.erase(priority_level);
+ current_probabilities_.erase(priority_level);
+ probabilities_of_priority_levels_->removeObject(priority_level);
+ has_feedback_from_all_queries_.erase(priority_level);
+ }
+ }
+
+ /**
+ * @brief Check if the Learner has presence of the given priority level.
+ **/
+ inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
+ DCHECK_EQ((current_probabilities_.find(priority_level) ==
+ current_probabilities_.end()),
+ execution_stats_.find(priority_level) == execution_stats_.end());
+ return (execution_stats_.find(priority_level) != execution_stats_.end());
+ }
+
+ /**
+ * @brief Check if the query is present.
+ **/
+ inline bool isQueryPresent(const std::size_t query_id) const {
+ return query_id_to_priority_lookup_.find(query_id) !=
+ query_id_to_priority_lookup_.end();
+ }
+
+ /**
+ * @brief Initialize all the data structures for a new query.
+ *
+ * @param query_handle The query handle for the new query.
+ **/
+ void initializeQuery(const QueryHandle &query_handle) {
+ const std::size_t priority_level = query_handle.query_priority();
+ const std::size_t query_id = query_handle.query_id();
+ DCHECK(isPriorityLevelPresent(priority_level));
+ query_id_to_priority_lookup_[query_id] = priority_level;
+ execution_stats_[priority_level].emplace_back(
+ query_id,
+ std::unique_ptr<ExecutionStats>(
+ new ExecutionStats(FLAGS_max_past_entries_learner)));
+ // As we are initializing the query, we obviously haven't gotten any
+ // feedback message for this query. Hence mark the following field as false.
+ has_feedback_from_all_queries_[priority_level] = false;
+ }
+
+ /**
+ * @brief Get the execution stats object for the given query.
+ *
+ * @return A pointer to the ExecutionStats for the query. If not present,
+ * returns NULL.
+ **/
+ inline ExecutionStats* getExecutionStats(const std::size_t query_id) {
+ if (isQueryPresent(query_id)) {
+ const auto stats_iter = getExecutionStatsIterMutable(query_id);
+ DCHECK(stats_iter !=
+ std::end(execution_stats_[getQueryPriority(query_id)]));
+ return stats_iter->second.get();
+ }
+ return nullptr;
+ }
+
+ /**
+ * @brief This function works well when the query and priority level exists
+ * in the data structures.
+ *
+ **/
+ inline std::vector<
+ std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
+ getExecutionStatsIterMutable(const std::size_t query_id) {
+ const std::size_t priority_level = getQueryPriority(query_id);
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_[priority_level];
+ // The following line uses std::find_if to reach to the desired element
+ // in the stats_vector.
+ auto stats_iter = std::find_if(
+ stats_vector.begin(),
+ stats_vector.end(),
+ [&query_id](
+ const std::pair<std::size_t, std::unique_ptr<ExecutionStats>> &p) {
+ return p.first == query_id;
+ });
+ return stats_iter;
+ }
+
+ inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+ const auto it = query_id_to_priority_lookup_.find(query_id);
+ DCHECK(it != query_id_to_priority_lookup_.end());
+ return it->second;
+ }
+
+ /**
+ * @brief Check if we have received at least one feedback message from all the
+ * queries in the given priority level.
+ **/
+ inline bool hasFeedbackFromAllQueriesInPriorityLevel(
+ const std::size_t priority_level) const {
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_.at(priority_level);
+ for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+ DCHECK(stats_vector[i].second != nullptr);
+ if (!stats_vector[i].second->hasStats()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ inline void updateFeedbackFromQueriesInPriorityLevel(
+ const std::size_t priority_level) {
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_.at(priority_level);
+ for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+ DCHECK(stats_vector[i].second != nullptr);
+ if (!stats_vector[i].second->hasStats()) {
+ // At least one query has no statistics so far.
+ return;
+ }
+ }
+ // All the queries have at least one execution statistic.
+ has_feedback_from_all_queries_[priority_level] = true;
+ }
+
+ inline const std::size_t hasActiveQueries() const {
+ return !query_id_to_priority_lookup_.empty();
+ }
+
+ /**
+ * @brief Get the mean work order execution times for all the queries in
+ * a given priority level.
+ *
+ * @param priority_level The priority level.
+ *
+ * @return An unordered_map in which: Key = query ID.
+ * Value = Mean time per work order for that query.
+ **/
+ inline std::unordered_map<std::size_t, std::size_t>
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(
+ const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ std::unordered_map<std::size_t, std::size_t> result;
+ for (auto it = execution_stats_[priority_level].begin();
+ it != execution_stats_[priority_level].end();
+ ++it) {
+ DCHECK(it->second.get() != nullptr);
+ auto query_stats = it->second->getCurrentStats();
+ result[it->first] =
+ query_stats.second == 0 ? 0 : query_stats.first / query_stats.second;
+ }
+ return result;
+ }
+
+ /**
+ * @param mean_workorder_per_query A vector of pairs in which:
+ * 1st element is mean time per work order
+ * 2nd element is the query ID.
+ *
+ * @note If any query has mean work order time as 0, we return 0 as the
+ * denominator.
+ *
+ * @return The denominator to be used for probability calculations.
+ **/
+ inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
+ &mean_workorder_per_query) const {
+ float denominator = 0;
+ for (const auto &element : mean_workorder_per_query) {
+ if (element.second != 0) {
+ denominator += 1/static_cast<float>(element.second);
+ } else {
+ return 0;
+ }
+ }
+ return denominator;
+ }
+
+ inline bool hasFeedbackFromAllPriorityLevels() const {
+ for (auto feedback : has_feedback_from_all_queries_) {
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Key = Priority level, value = A vector of pairs.
+ // Each pair:
+ // 1st element: Query ID.
+ // 2nd Element: Execution statistics for the query.
+ std::unordered_map<
+ std::size_t,
+ std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>>
+ execution_stats_;
+
+ // Key = query ID, value = priority level for the query ID.
+ std::unordered_map<std::size_t, std::size_t> query_id_to_priority_lookup_;
+
+ // Key = priority level, value = ProbabilityStore for the queries belonging to
+ // that priority level.
+ std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+ current_probabilities_;
+
+ // Key = priority level, value = ProbabilityStore for the queries belonging to
+ // that priority level.
+ std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+ default_probabilities_;
+
+ // ProbabilityStrore for probabilities mapped to the priority levels.
+ std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+
+ // Key = priority level. Value = A boolean that indicates if we have received
+ // feedback from all the queries in the given priority level.
+ // TODO(harshad) - Invalidate the cache whenever needed.
+ std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+
+ DISALLOW_COPY_AND_ASSIGN(Learner);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5f15cae/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index db7206b..ff734ca 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
@@ -42,6 +43,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" the workers.");
bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+ Learner learner;
if (admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5f15cae/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 8343d24..d31caa6 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -22,6 +22,7 @@
#include <cstddef>
#include <random>
#include <unordered_map>
+#include <utility>
#include <vector>
#include "utility/Macros.hpp"
@@ -40,7 +41,7 @@ class ProbabilityStore {
* @brief Constructor.
**/
ProbabilityStore()
- : mt_(std::random_device()()) {}
+ : common_denominator_(1.0), mt_(std::random_device()()) {}
/**
* @brief Get the number of objects in the store.
@@ -50,6 +51,10 @@ class ProbabilityStore {
return individual_probabilities_.size();
}
+ inline const std::size_t getDenominator() const {
+ return common_denominator_;
+ }
+
/**
* @brief Add individual (not cumulative) probability for a given object.
*
@@ -59,16 +64,48 @@ class ProbabilityStore {
* @note This function may override previously written probability values.
*
* @param property The property of the given object.
- * @param individual_probability The individual (not cumulative) probability
- * of the given object.
+ * @param numerator The numerator for the given object.
**/
- void addProbability(const std::size_t property,
- const float individual_probability) {
- individual_probabilities_[property] = individual_probability;
+ void addOrUpdateObject(const std::size_t property,
+ const float numerator) {
+ DCHECK_LE(numerator, common_denominator_);
+ // We should have the correct individual probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we rely on the probabilities for all the objects in
+ // updateCumulativeProbabilities().
+ individual_probabilities_[property] =
+ std::make_pair(numerator, numerator / common_denominator_);
updateCumulativeProbabilities();
}
/**
+ * @brief Add individual (not cumulative) probability for a given object with
+ * updated denominator.
+ *
+ * @note This function leaves the cumulative probabilities in a consistent
+ * state. An alternative lazy implementation should be written if cost
+ * of calculating cumulative probabilities is high.
+ * @note This function may override previously written probability values.
+ *
+ * @param property The property of the given object.
+ * @param numerator The numerator for the given object.
+ * @param new_denominator The updated denominator for the store.
+ **/
+ void addOrUpdateObjectNewDenominator(const std::size_t property,
+ const float numerator,
+ const float new_denominator) {
+ CHECK_GT(new_denominator, 0u);
+ DCHECK_LE(numerator, new_denominator);
+ common_denominator_ = new_denominator;
+ // It is alright to not have the correct probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we compute the probabilities for all the objects in
+ // updateProbabilitiesNewDenominator().
+ individual_probabilities_[property] = std::make_pair(numerator, 0.0);
+ updateProbabilitiesNewDenominator();
+ }
+
+ /**
* @brief Add individual (not cumulative) probabilities for given objects.
*
* @note This function leaves the cumulative probabilities in a consistent
@@ -77,30 +114,40 @@ class ProbabilityStore {
* @note This function may override previously written probability values.
*
* @param properties A vector of properties to be added.
- * @param individual_probabilities The individual (not cumulative)
- * probabilities of the given objects.
+ * @param numerators The numerators of the given objects.
**/
- void addProbabilities(const std::vector<std::size_t> &properties,
- const std::vector<float> &individual_probabilities) {
- DCHECK_EQ(properties.size(), individual_probabilities.size());
+ void addOrUpdateObjects(const std::vector<std::size_t> &properties,
+ const std::vector<float> &numerators) {
+ DCHECK_EQ(properties.size(), numerators.size());
for (std::size_t i = 0; i < properties.size(); ++i) {
- individual_probabilities_[properties[i]] = individual_probabilities[i];
+ DCHECK_LE(numerators[i], common_denominator_);
+ // We should have the correct individual probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we rely on the probabilities for all the objects in
+ // updateCumulativeProbabilities().
+ individual_probabilities_[properties[i]] =
+ std::make_pair(numerators[i], numerators[i] / common_denominator_);
}
updateCumulativeProbabilities();
}
- /**
- * @brief Update the probability of a given object to a new value.
- *
- * @param property The property of the object.
- * @param new_individual_probability The new probability to be set.
- **/
- void updateProbability(const std::size_t property,
- const float new_individual_probability) {
- auto it = individual_probabilities_.find(property);
- DCHECK(it != individual_probabilities_.end());
- it->second = new_individual_probability;
- updateCumulativeProbabilities();
+ void addOrUpdateObjectsNewDenominator(
+ const std::vector<std::size_t> &properties,
+ const std::vector<float> &numerators,
+ const float new_denominator) {
+ CHECK_GT(new_denominator, 0u);
+ DCHECK_EQ(properties.size(), numerators.size());
+ common_denominator_ = new_denominator;
+ for (std::size_t i = 0; i < properties.size(); ++i) {
+ DCHECK_LE(numerators[i], common_denominator_);
+ // It is alright to not have the correct probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we compute the probabilities for all the objects in
+ // updateProbabilitiesNewDenominator().
+ individual_probabilities_[properties[i]] =
+ std::make_pair(numerators[i], 0.0);
+ }
+ updateProbabilitiesNewDenominator();
}
/**
@@ -109,10 +156,24 @@ class ProbabilityStore {
* @param property The property of the object to be removed.
**/
void removeObject(const std::size_t property) {
- auto it = individual_probabilities_.find(property);
- DCHECK(it != individual_probabilities_.end());
- individual_probabilities_.erase(it);
- updateCumulativeProbabilities();
+ auto individual_it = individual_probabilities_.find(property);
+ DCHECK(individual_it != individual_probabilities_.end());
+ individual_probabilities_.erase(individual_it);
+ if (!individual_probabilities_.empty()) {
+ float new_denominator = 0;
+ for (auto it = individual_probabilities_.begin();
+ it != individual_probabilities_.end();
+ ++it) {
+ new_denominator += it->second.first;
+ }
+ CHECK_GT(new_denominator, 0);
+ common_denominator_ = new_denominator;
+ updateCumulativeProbabilities();
+ } else {
+ // In order to keep the store consistent, we should keep the sizes of
+ // individual_probabilities_ and cumulative_probabilities_ the same.
+ cumulative_probabilities_.clear();
+ }
}
/**
@@ -123,7 +184,7 @@ class ProbabilityStore {
const float getIndividualProbability(const std::size_t property) const {
const auto it = individual_probabilities_.find(property);
DCHECK(it != individual_probabilities_.end());
- return it->second;
+ return it->second.second;
}
/**
@@ -141,13 +202,13 @@ class ProbabilityStore {
return;
}
float cumulative_probability = 0;
- for (const auto property_probability_pair : individual_probabilities_) {
- cumulative_probabilities_.emplace_back(property_probability_pair.first,
+ for (const auto p : individual_probabilities_) {
+ cumulative_probabilities_.emplace_back(p.first,
cumulative_probability);
- cumulative_probability += property_probability_pair.second;
+ cumulative_probability += p.second.second;
}
- // Adjust the last cumulative probability manually to 1.0, so that floating
- // addition related rounding issues are ignored.
+ // Adjust the last cumulative probability manually to 1.0, so that
+ // floating addition related rounding issues are ignored.
cumulative_probabilities_.back().updateProbability(1.0);
}
@@ -208,9 +269,26 @@ class ProbabilityStore {
return it->property_;
}
- std::unordered_map<std::size_t, float> individual_probabilities_;
+ inline void updateProbabilitiesNewDenominator() {
+ // First update the individual probabilities.
+ for (auto it = individual_probabilities_.begin();
+ it != individual_probabilities_.end();
+ ++it) {
+ DCHECK_LE(it->second.first, common_denominator_);
+ it->second.second = it->second.first / common_denominator_;
+ }
+ updateCumulativeProbabilities();
+ }
+
+ // Key = property of the object.
+ // Value = A pair ...
+ // 1st element: Numerator of the object.
+ // 2nd element: Individual probability of the object.
+ std::unordered_map<std::size_t, std::pair<float, float>> individual_probabilities_;
std::vector<ProbabilityInfo> cumulative_probabilities_;
+ float common_denominator_;
+
std::mt19937_64 mt_;
DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5f15cae/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index e624557..dcec1e5 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -28,14 +28,15 @@ TEST(ProbabilityStoreTest, CountTest) {
ProbabilityStore store;
EXPECT_EQ(0u, store.getNumObjects());
const std::size_t kProperty = 0;
- store.addProbability(kProperty, 0.5);
+ store.addOrUpdateObject(kProperty, 1);
EXPECT_EQ(1u, store.getNumObjects());
store.removeObject(kProperty);
EXPECT_EQ(0u, store.getNumObjects());
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
EXPECT_EQ(objects.size(), store.getNumObjects());
}
@@ -43,11 +44,12 @@ TEST(ProbabilityStoreTest, CountTest) {
TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
- EXPECT_EQ(probabilities[object_num],
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
store.getIndividualProbability(objects[object_num]));
}
}
@@ -55,8 +57,9 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
const std::size_t kNumTrials = 10;
while (!objects.empty()) {
@@ -72,4 +75,30 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
}
}
+TEST(ProbabilityStoreTest, RemoveObjectTest) {
+ ProbabilityStore store;
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+ store.getIndividualProbability(objects[object_num]));
+ }
+
+ // Remove last object "9", with numerator 5.
+ store.removeObject(objects.back());
+ objects.pop_back();
+ numerators.pop_back();
+ const float expected_new_denominator =
+ std::accumulate(numerators.begin(), numerators.end(), 0);
+
+ EXPECT_EQ(expected_new_denominator, store.getDenominator());
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+ store.getIndividualProbability(objects[object_num]));
+ }
+}
+
} // namespace quickstep