You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/14 04:20:58 UTC

incubator-quickstep git commit: Check memory availability before admitting a query.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/memory-estimate 1018c56b4 -> 0b973b38e


Check memory availability before admitting a query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0b973b38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0b973b38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0b973b38

Branch: refs/heads/memory-estimate
Commit: 0b973b38efdbdbe287755d1dde629d35e8268859
Parents: 1018c56
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jul 13 23:19:58 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jul 13 23:19:58 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             |  1 +
 query_execution/PriorityPolicyEnforcer.cpp | 51 +++++++++++++++++++------
 query_execution/PriorityPolicyEnforcer.hpp |  4 ++
 3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0b973b38/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 9ab86b2..11e0e1d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_relationaloperators_WorkOrder
+                      quickstep_storage_StorageManager
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_queryexecution_PriorityPolicyEnforcer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0b973b38/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 3d151db..f426568 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -17,6 +17,7 @@
 
 #include "query_execution/PriorityPolicyEnforcer.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <memory>
 #include <queue>
@@ -32,6 +33,7 @@
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageManager.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -57,12 +59,13 @@ PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_clie
       storage_manager_(storage_manager),
       worker_directory_(worker_directory),
       bus_(bus),
-      profile_individual_workorders_(profile_individual_workorders) {
+      profile_individual_workorders_(profile_individual_workorders),
+      committed_memory_(0) {
   learner_.reset(new Learner());
 }
 
 bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < kMaxConcurrentQueries) {
+  if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();
     if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
@@ -70,13 +73,14 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       admitted_queries_[query_id].reset(
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
-      DLOG(INFO) << "Admitted query with ID: " << query_handle->query_id()
-                 << " priority: " << query_handle->query_priority();
+      std::cout << "Admitted query with ID: " << query_handle->query_id()
+                 << " priority: " << query_handle->query_priority() << "\n";
       priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
       query_handle->setAdmissionTime();
       query_id_to_handle_[query_handle->query_id()] = query_handle;
       LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
+      LOG(INFO) << "Query " << query_handle->query_id() << " memory admissible? " << admissionMemoryCheck(query_handle);
       return true;
     } else {
       LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -84,9 +88,12 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
     }
   } else {
     // This query will have to wait.
-    DLOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
-    query_id_to_handle_[query_handle->query_id()] = query_handle;
-    waiting_queries_.push(query_handle);
+    std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
+    if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
+      // This query was not waitlisted earlier.
+      query_id_to_handle_[query_handle->query_id()] = query_handle;
+      waiting_queries_.push(query_handle);
+    }
     return false;
   }
 }
@@ -166,8 +173,11 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.
       QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
+      // waiting_queries_.pop();
+      if (admitQuery(new_query)) {
+        std::cout << "Removing Q " << new_query->query_id() << " from waitlist\n";
+        waiting_queries_.pop();
+      }
     }
   } else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
     learner_->removeOperator(query_id, operator_id);
@@ -237,6 +247,8 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
     priority_query_ids_.erase(query_priority_unsigned);
   }
   query_id_to_handle_[query_id]->setCompletionTime();
+  const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
+  committed_memory_ -= estimated_memory_bytes;
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
   // TODO(harshad) - Admit waiting queries, if any.
@@ -245,12 +257,13 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
 
 bool PriorityPolicyEnforcer::admitQueries(
     const std::vector<QueryHandle*> &query_handles) {
+  bool result = true;
   for (QueryHandle *curr_query : query_handles) {
     if (!admitQuery(curr_query)) {
-      return false;
+      result = false;
     }
   }
-  return true;
+  return result;
 }
 
 void PriorityPolicyEnforcer::recordTimeForWorkOrder(
@@ -324,4 +337,20 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
   return nullptr;
 }
 
+bool PriorityPolicyEnforcer::admissionMemoryCheck(const QueryHandle *query_handle) {
+  if (admitted_queries_.empty()) {
+    // No query running in the system, let the query in.
+    return true;
+  }
+  const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
+  const std::size_t estimated_slots = StorageManager::SlotsNeededForBytes(estimated_memory_requirement_bytes);
+  const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize());
+  const std::size_t committed_slots = StorageManager::SlotsNeededForBytes(committed_memory_);
+  if (std::max(committed_slots, current_slots) + estimated_slots < storage_manager_->getMaxBufferPoolSlots()) {
+    committed_memory_ += estimated_memory_requirement_bytes;
+    return true;
+  }
+  return false;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0b973b38/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index b20d831..4d5eb6a 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -189,6 +189,8 @@ class PriorityPolicyEnforcer {
       const std::size_t priority_level,
       std::unordered_map<std::size_t, bool> *finished_queries_ids);
 
+  bool admissionMemoryCheck(const QueryHandle *query_handle);
+
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;
 
@@ -226,6 +228,8 @@ class PriorityPolicyEnforcer {
 
   std::unique_ptr<Learner> learner_;
 
+  std::size_t committed_memory_;
+
   DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer);
 };