You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/07 17:14:51 UTC
incubator-quickstep git commit: Bool flag to toggle between static
and dynamic
Repository: incubator-quickstep
Updated Branches:
refs/heads/scheduler++ bc4d2ced9 -> 475da429e
Bool flag to toggle between static and dynamic
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/475da429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/475da429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/475da429
Branch: refs/heads/scheduler++
Commit: 475da429ee31e9a7e4f84b4c5e2aa2dbcff132f2
Parents: bc4d2ce
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jul 7 12:14:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jul 7 12:14:25 2016 -0500
----------------------------------------------------------------------
query_execution/Foreman.cpp | 2 +-
query_execution/Learner.cpp | 2 +-
query_execution/Learner.hpp | 60 ++++++++++++++++++-------
query_execution/PriorityPolicyEnforcer.cpp | 28 +++++++++++-
query_execution/PriorityPolicyEnforcer.hpp | 11 +----
5 files changed, 73 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index a5e8894..40d6e5c 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -46,7 +46,7 @@ using std::vector;
namespace quickstep {
-DEFINE_uint64(min_load_per_worker, 1, "The minimum load defined as the number "
+DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
"of pending work orders for the worker. This information is used "
"by the Foreman to assign work orders to worker threads");
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 56e9626..ed5b394 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -52,13 +52,13 @@ void Learner::addCompletionFeedback(
&workorder_completion_proto) {
const std::size_t query_id = workorder_completion_proto.query_id();
DCHECK(isQueryPresent(query_id));
- const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
ExecutionStats *execution_stats = getExecutionStats(query_id);
DCHECK(execution_stats != nullptr);
execution_stats->addEntry(
workorder_completion_proto.execution_time_in_microseconds(),
workorder_completion_proto.operator_index());
+ const std::size_t priority_level = getQueryPriorityUnsafe(query_id);
if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
updateFeedbackFromQueriesInPriorityLevel(priority_level);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 8654544..03c692b 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -40,7 +40,6 @@ namespace quickstep {
/** \addtogroup QueryExecution
* @{
*/
-
class Learner {
public:
/**
@@ -151,6 +150,8 @@ class Learner {
*
* @note We use uniform random distribution.
*
+ * TODO(harshad) - Use a default_probabilities_of_priority_levels data structure.
+ *
* @return A priority level. If no queries are present in the learner, return
* kInvalidPriorityLevel.
**/
@@ -177,7 +178,8 @@ class Learner {
const int random_priority_level = pickRandomPriorityLevel();
// Note : The valid priority level values are non-zero.
DCHECK_GT(random_priority_level, 0);
- const int result = pickRandomQueryFromPriorityLevel(
+ // TODO(harshad) - Make this function a template too.
+ const int result = pickRandomQueryFromPriorityLevel<true>(
static_cast<std::size_t>(random_priority_level));
return result;
} else {
@@ -193,25 +195,51 @@ class Learner {
* @return A query ID. If no queries are present for this priority level in
* the learner, return kInvalidQueryID.
**/
+ template <bool dynamic_probabilities>
inline const int pickRandomQueryFromPriorityLevel(
const std::size_t priority_level) const {
DCHECK(isPriorityLevelPresent(priority_level));
- if (hasActiveQueries()) {
- if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
- DCHECK(current_probabilities_.at(priority_level) != nullptr);
- const auto it = current_probabilities_.find(priority_level);
- if (it->second->getNumObjects() > 0) {
- return static_cast<int>(
- current_probabilities_.at(priority_level)->pickRandomProperty());
- }
- } else {
- DCHECK(default_probabilities_.at(priority_level) != nullptr);
- const auto it = default_probabilities_.find(priority_level);
- if (it->second->getNumObjects() > 0) {
- return static_cast<int>(
- default_probabilities_.at(priority_level)->pickRandomProperty());
+ if (dynamic_probabilities) {
+ if (hasActiveQueries()) {
+ if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+ DCHECK(current_probabilities_.at(priority_level) != nullptr);
+ const auto it = current_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ current_probabilities_.at(priority_level)->pickRandomProperty());
+ }
+ } else {
+ DCHECK(default_probabilities_.at(priority_level) != nullptr);
+ const auto it = default_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ default_probabilities_.at(priority_level)->pickRandomProperty());
+ }
}
}
+ return kInvalidQueryID;
+ } else {
+ return pickRandomQueryFromPriorityLevelDefaultProbs(priority_level);
+ }
+ }
+
+ /**
+ * @brief Randomly pick a query from a given priority level using the default
+ * probabilities.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A query ID. If no queries are present for this priority level in
+ * the learner, return kInvalidQueryID.
+ **/
+ inline const int pickRandomQueryFromPriorityLevelDefaultProbs(const std::size_t priority_level) const {
+ if (hasActiveQueries()) {
+ DCHECK(default_probabilities_.at(priority_level) != nullptr);
+ const auto it = default_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ default_probabilities_.at(priority_level)->pickRandomProperty());
+ }
}
return kInvalidQueryID;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 7d0d46f..36e2624 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -38,10 +38,29 @@
namespace quickstep {
-DEFINE_uint64(max_msgs_per_dispatch_round, 80, "Maximum number of messages that"
+DEFINE_uint64(max_msgs_per_dispatch_round, 40, "Maximum number of messages that"
" can be allocated in a single round of dispatch of messages to"
" the workers.");
+DEFINE_bool(dynamic_probabilities_in_learner, true, "Whether the learner should have dynamic probabilities or static probabilities");
+
+PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ WorkerDirectory *worker_directory,
+ tmb::MessageBus *bus,
+ const bool profile_individual_workorders)
+ : foreman_client_id_(foreman_client_id),
+ num_numa_nodes_(num_numa_nodes),
+ catalog_database_(catalog_database),
+ storage_manager_(storage_manager),
+ worker_directory_(worker_directory),
+ bus_(bus),
+ profile_individual_workorders_(profile_individual_workorders) {
+ learner_.reset(new Learner());
+}
+
bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
if (admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
@@ -253,7 +272,12 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
std::unordered_map<std::size_t, bool> checked_query_ids;
// While there are more queries to be checked ..
while (checked_query_ids.size() < priority_query_ids_[priority_level].size()) {
- const int chosen_query_id = learner_->pickRandomQueryFromPriorityLevel(priority_level);
+ int chosen_query_id;
+ if (FLAGS_dynamic_probabilities_in_learner) {
+ chosen_query_id = learner_->pickRandomQueryFromPriorityLevel<true>(priority_level);
+ } else {
+ chosen_query_id = learner_->pickRandomQueryFromPriorityLevel<false>(priority_level);
+ }
if (chosen_query_id == kInvalidQueryID) {
// No query available at this time in this priority level.
return nullptr;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/475da429/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 1f13a10..b20d831 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -65,16 +65,7 @@ class PriorityPolicyEnforcer {
StorageManager *storage_manager,
WorkerDirectory *worker_directory,
tmb::MessageBus *bus,
- const bool profile_individual_workorders = false)
- : foreman_client_id_(foreman_client_id),
- num_numa_nodes_(num_numa_nodes),
- catalog_database_(catalog_database),
- storage_manager_(storage_manager),
- worker_directory_(worker_directory),
- bus_(bus),
- profile_individual_workorders_(profile_individual_workorders) {
- learner_.reset(new Learner());
- }
+ const bool profile_individual_workorders = false);
/**
* @brief Destructor.