You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/09 18:06:18 UTC

[2/6] incubator-quickstep git commit: Minor refactored distributed query execution.

Minor refactored distributed query execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3011ddf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3011ddf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3011ddf6

Branch: refs/heads/hdfs_text_scan
Commit: 3011ddf61ec92efcb833ef0a1168255ff97fb9f9
Parents: 5773027
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 17:36:45 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 17:42:42 2017 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        |  1 -
 query_execution/PolicyEnforcerBase.cpp        |  2 -
 query_execution/PolicyEnforcerBase.hpp        | 14 -----
 query_execution/PolicyEnforcerDistributed.cpp | 59 ++++++++++------------
 4 files changed, 27 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 4d95f16..8c20e65 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -175,7 +175,6 @@ void ForemanDistributed::run() {
       case kQueryInitiateResponseMessage: {
         S::QueryInitiateResponseMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-        CHECK(policy_enforcer_->existQuery(proto.query_id()));
         break;
       }
       case kCatalogRelationNewBlockMessage:  // Fall through

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index a26b84e..082f6e9 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -156,8 +156,6 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
                  << " that hasn't finished its execution";
   }
   admitted_queries_.erase(query_id);
-
-  removed_query_ids_.insert(query_id);
 }
 
 bool PolicyEnforcerBase::admitQueries(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index baf9c68..4107817 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -103,16 +103,6 @@ class PolicyEnforcerBase {
   void processMessage(const TaggedMessage &tagged_message);
 
   /**
-   * @brief Check if the given query id ever exists.
-   *
-   * @return True if the query ever exists, otherwise false.
-   **/
-  inline bool existQuery(const std::size_t query_id) const {
-    return admitted_queries_.find(query_id) != admitted_queries_.end() ||
-           removed_query_ids_.find(query_id) != removed_query_ids_.end();
-  }
-
-  /**
    * @brief Check if there are any queries to be executed.
    *
    * @return True if there is at least one active or waiting query, false if
@@ -179,10 +169,6 @@ class PolicyEnforcerBase {
   // Key = query ID, value = QueryManagerBase* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
 
-  // TODO(quickstep-team): Delete a 'query_id' after receiving all
-  // 'QueryInitiateResponseMessage's for the 'query_id'.
-  std::unordered_set<std::size_t> removed_query_ids_;
-
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 49a1d9a..ef5abb0 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -68,8 +68,15 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
   // TODO(harshad) - Make this function generic enough so that it
   // works well when multiple queries are getting executed.
   if (admitted_queries_.empty()) {
-    LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
-    return;
+    if (waiting_queries_.empty()) {
+      LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
+      return;
+    } else {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
   }
 
   const std::size_t per_query_share =
@@ -106,28 +113,28 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
 }
 
 bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
-    // Ok to admit the query.
-    const std::size_t query_id = query_handle->query_id();
-    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-      // NOTE(zuyu): Should call before constructing a 'QueryManager'.
-      // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
-      // initializes.
-      initiateQueryInShiftboss(query_handle);
-
-      // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
-      return true;
-    } else {
-      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
-      return false;
-    }
-  } else {
+  if (admitted_queries_.size() >= PolicyEnforcerBase::kMaxConcurrentQueries) {
     // This query will have to wait.
     waiting_queries_.push(query_handle);
     return false;
   }
+
+  const std::size_t query_id = query_handle->query_id();
+  if (admitted_queries_.find(query_id) != admitted_queries_.end()) {
+    LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+    return false;
+  }
+
+  // Ok to admit the query.
+  // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+  // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+  // initializes.
+  initiateQueryInShiftboss(query_handle);
+
+  // Query with the same ID not present, ok to admit.
+  admitted_queries_[query_id].reset(
+      new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+  return true;
 }
 
 void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) {
@@ -144,18 +151,6 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
   query_manager->processInitiateRebuildResponseMessage(
       proto.operator_index(), num_rebuild_work_orders, shiftboss_index);
   shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders);
-
-  if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-    onQueryCompletion(query_manager);
-
-    removeQuery(query_id);
-    if (!waiting_queries_.empty()) {
-      // Admit the earliest waiting query.
-      QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
-    }
-  }
 }
 
 void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(