You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/10 14:45:41 UTC
[20/31] incubator-quickstep git commit: Minor refactored distributed
query execution.
Minor refactored distributed query execution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3011ddf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3011ddf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3011ddf6
Branch: refs/heads/reorder-partitioned-hash-join
Commit: 3011ddf61ec92efcb833ef0a1168255ff97fb9f9
Parents: 5773027
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 17:36:45 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 17:42:42 2017 -0800
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 1 -
query_execution/PolicyEnforcerBase.cpp | 2 -
query_execution/PolicyEnforcerBase.hpp | 14 -----
query_execution/PolicyEnforcerDistributed.cpp | 59 ++++++++++------------
4 files changed, 27 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 4d95f16..8c20e65 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -175,7 +175,6 @@ void ForemanDistributed::run() {
case kQueryInitiateResponseMessage: {
S::QueryInitiateResponseMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
- CHECK(policy_enforcer_->existQuery(proto.query_id()));
break;
}
case kCatalogRelationNewBlockMessage: // Fall through
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index a26b84e..082f6e9 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -156,8 +156,6 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
<< " that hasn't finished its execution";
}
admitted_queries_.erase(query_id);
-
- removed_query_ids_.insert(query_id);
}
bool PolicyEnforcerBase::admitQueries(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index baf9c68..4107817 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -103,16 +103,6 @@ class PolicyEnforcerBase {
void processMessage(const TaggedMessage &tagged_message);
/**
- * @brief Check if the given query id ever exists.
- *
- * @return True if the query ever exists, otherwise false.
- **/
- inline bool existQuery(const std::size_t query_id) const {
- return admitted_queries_.find(query_id) != admitted_queries_.end() ||
- removed_query_ids_.find(query_id) != removed_query_ids_.end();
- }
-
- /**
* @brief Check if there are any queries to be executed.
*
* @return True if there is at least one active or waiting query, false if
@@ -179,10 +169,6 @@ class PolicyEnforcerBase {
// Key = query ID, value = QueryManagerBase* for the key query.
std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
- // TODO(quickstep-team): Delete a 'query_id' after receiving all
- // 'QueryInitiateResponseMessage's for the 'query_id'.
- std::unordered_set<std::size_t> removed_query_ids_;
-
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 49a1d9a..ef5abb0 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -68,8 +68,15 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
// TODO(harshad) - Make this function generic enough so that it
// works well when multiple queries are getting executed.
if (admitted_queries_.empty()) {
- LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
- return;
+ if (waiting_queries_.empty()) {
+ LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
+ return;
+ } else {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
}
const std::size_t per_query_share =
@@ -106,28 +113,28 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
}
bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
- if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
- // Ok to admit the query.
- const std::size_t query_id = query_handle->query_id();
- if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
- // NOTE(zuyu): Should call before constructing a 'QueryManager'.
- // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
- // initializes.
- initiateQueryInShiftboss(query_handle);
-
- // Query with the same ID not present, ok to admit.
- admitted_queries_[query_id].reset(
- new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
- return true;
- } else {
- LOG(ERROR) << "Query with the same ID " << query_id << " exists";
- return false;
- }
- } else {
+ if (admitted_queries_.size() >= PolicyEnforcerBase::kMaxConcurrentQueries) {
// This query will have to wait.
waiting_queries_.push(query_handle);
return false;
}
+
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) != admitted_queries_.end()) {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+
+ // Ok to admit the query.
+ // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+ // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+ // initializes.
+ initiateQueryInShiftboss(query_handle);
+
+ // Query with the same ID not present, ok to admit.
+ admitted_queries_[query_id].reset(
+ new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+ return true;
}
void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) {
@@ -144,18 +151,6 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
query_manager->processInitiateRebuildResponseMessage(
proto.operator_index(), num_rebuild_work_orders, shiftboss_index);
shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders);
-
- if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
- onQueryCompletion(query_manager);
-
- removeQuery(query_id);
- if (!waiting_queries_.empty()) {
- // Admit the earliest waiting query.
- QueryHandle *new_query = waiting_queries_.front();
- waiting_queries_.pop();
- admitQuery(new_query);
- }
- }
}
void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(