You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/20 04:31:19 UTC

incubator-quickstep git commit: Changes for load controller experiment.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/memory-estimate 14622e255 -> 21a367f00


Changes for load controller experiment.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/21a367f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/21a367f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/21a367f0

Branch: refs/heads/memory-estimate
Commit: 21a367f00debb314c9144dc6d5e00fdf3463c625
Parents: 14622e2
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jul 19 23:30:51 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Jul 19 23:30:51 2016 -0500

----------------------------------------------------------------------
 query_execution/Foreman.cpp                |  48 ++++++++-
 query_execution/Foreman.hpp                |  12 +++
 query_execution/PriorityPolicyEnforcer.cpp | 136 +++++++++++++-----------
 query_execution/PriorityPolicyEnforcer.hpp |   5 +-
 4 files changed, 136 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 40d6e5c..a7027d9 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -24,6 +24,7 @@
 #include <utility>
 #include <vector>
 
+#include "cli/InputParserUtil.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
@@ -49,6 +50,8 @@ namespace quickstep {
 DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
               "of pending work orders for the worker. This information is used "
               "by the Foreman to assign work orders to worker threads");
+DEFINE_string(high_priority_queries_entry_points, "", "A comma separated list of entry points for high priority queries, each of which is defined in terms of milliseconds since the beginning of workload execution");
+DEFINE_uint64(num_high_priority_queries, 1, "Number of high priority queries to be admitted to the system");
 
 Foreman::Foreman(const tmb::client_id main_thread_client_id,
                  WorkerDirectory *worker_directory,
@@ -62,7 +65,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       main_thread_client_id_(main_thread_client_id),
       worker_directory_(DCHECK_NOTNULL(worker_directory)),
       catalog_database_(DCHECK_NOTNULL(catalog_database)),
-      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+      storage_manager_(DCHECK_NOTNULL(storage_manager)),
+      start_time_(std::chrono::steady_clock::now()),
+      high_priority_queries_injection_points_(InputParserUtil::ParseWorkerAffinities(FLAGS_num_high_priority_queries, FLAGS_high_priority_queries_entry_points))
+      {
   const std::vector<QueryExecutionMessageType> sender_message_types{
       kPoisonMessage,
       kRebuildWorkOrderMessage,
@@ -95,6 +101,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       worker_directory_,
       bus_,
       profile_individual_workorders));
+
+  CHECK(FLAGS_num_high_priority_queries == high_priority_queries_injection_points_.size()) << "Number of high priority queries should be same as number of entry points";
+
+  high_priority_queries_admitted_.resize(FLAGS_num_high_priority_queries, false);
 }
 
 void Foreman::run() {
@@ -125,6 +135,15 @@ void Foreman::run() {
         const AdmitRequestMessage *msg =
             static_cast<const AdmitRequestMessage *>(tagged_message.message());
         const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+        vector<QueryHandle*> reduced_query_handles_list;
+        CHECK(query_handles.size() > FLAGS_num_high_priority_queries) << "Number of high priority queries should be less than total number of queries";
+        for (std::size_t i = 0; i < query_handles.size(); ++i) {
+          if (i < query_handles.size() - FLAGS_num_high_priority_queries) {
+            reduced_query_handles_list.push_back(query_handles[i]);
+          } else {
+            high_priority_query_handles_.push(query_handles[i]);
+          }
+        }
 
         DCHECK(!query_handles.empty());
         bool all_queries_admitted = true;
@@ -132,7 +151,7 @@ void Foreman::run() {
           all_queries_admitted =
               policy_enforcer_->admitQuery(query_handles.front());
         } else {
-          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+          all_queries_admitted = policy_enforcer_->admitQueries(reduced_query_handles_list);
         }
         if (!all_queries_admitted) {
           LOG(WARNING) << "The scheduler could not admit all the queries";
@@ -157,6 +176,8 @@ void Foreman::run() {
       dispatchWorkerMessages(new_messages);
     }
 
+    checkAndAdmitHighPriorityQueries();
+
     // We check again, as some queries may produce zero work orders and finish
     // their execution.
     if (!policy_enforcer_->hasQueries()) {
@@ -252,4 +273,27 @@ void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
   }
 }
 
+bool Foreman::checkAndAdmitHighPriorityQueries() {
+  for (std::size_t i = 0; i < high_priority_queries_admitted_.size(); ++i) {
+    if (!high_priority_queries_admitted_[i]) {
+      // Check the timestamp.
+      auto time_in_millis = std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - start_time_).count();
+      if (time_in_millis > high_priority_queries_injection_points_[i]) {
+        // Admit the query.
+        QueryHandle *next_query_handle = high_priority_query_handles_.front();
+        high_priority_query_handles_.pop();
+        if (!policy_enforcer_->admitQuery(next_query_handle)) {
+          LOG(INFO) << "Could not admit query with ID: " << next_query_handle->query_id();
+        }
+        high_priority_queries_admitted_[i] = true;
+        return true;
+      } else {
+        // Wait for some more time to admit this query.
+        return false;
+      }
+    }
+  }
+  return false;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index c38a3e6..d2db48a 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -18,6 +18,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
 
+#include <chrono>
 #include <cstddef>
 #include <cstdio>
 #include <memory>
@@ -121,6 +122,8 @@ class Foreman final : public ForemanLite {
    **/
   bool canCollectNewMessages(const tmb::message_type_id message_type);
 
+  bool checkAndAdmitHighPriorityQueries();
+
   const tmb::client_id main_thread_client_id_;
 
   WorkerDirectory *worker_directory_;
@@ -130,6 +133,15 @@ class Foreman final : public ForemanLite {
 
   std::unique_ptr<PriorityPolicyEnforcer> policy_enforcer_;
 
+  // Start time for Foreman.
+  const std::chrono::steady_clock::time_point start_time_;
+  // Whether high priority queries have been admitted to the system.
+  std::vector<bool> high_priority_queries_admitted_;
+  // Defined in terms of number of milliseconds.
+  const std::vector<int> high_priority_queries_injection_points_;
+  std::queue<QueryHandle*> high_priority_query_handles_;
+
+
   DISALLOW_COPY_AND_ASSIGN(Foreman);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 11b9d70..a238b8f 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -62,65 +62,68 @@ PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_clie
       worker_directory_(worker_directory),
       bus_(bus),
       profile_individual_workorders_(profile_individual_workorders),
-      committed_memory_(0) {
+      committed_memory_(0),
+      suspended_memory_(0) {
   learner_.reset(new Learner());
 }
 
 bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
   // Find a victim query to be suspended.
-  while (!admissionMemoryCheck(query_handle)) {
-    std::pair<int, std::size_t> victim_query = getQueryWithHighestMemoryFootprint();
-    if (victim_query.first != kInvalidQueryID) {
-      // We need to suspend this query - move it from admitted to suspended.
-      suspendQuery(victim_query.first);
+  bool memory_available = admissionMemoryCheck(query_handle);
+  if (!memory_available) {
+    while (!memory_available) {
+      std::pair<int, std::size_t> victim_query = getQueryWithHighestMemoryFootprint();
+      if (victim_query.first != kInvalidQueryID) {
+        // We need to suspend this query - move it from admitted to suspended.
+        suspendQuery(victim_query.first);
+        memory_available = admissionMemoryCheck(query_handle);
+      } else {
+        std::cout << "No victim found, okay to admit query " << query_handle->query_id() << "\n";
+        break;
+      }
     }
   }
-  // if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
-    // Ok to admit the query.
-    const std::size_t query_id = query_handle->query_id();
-    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-      // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
-                           catalog_database_, storage_manager_, bus_));
-      std::cout << "Admitted query with ID: " << query_handle->query_id()
-                 << " priority: " << query_handle->query_priority() << "\n";
-      priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
-      learner_->addQuery(*query_handle);
-      query_handle->setAdmissionTime();
-      query_id_to_handle_[query_handle->query_id()] = query_handle;
-      LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
-      return true;
-    } else {
-      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
-      return false;
-    }
-  /*} else {
-    // This query will have to wait.
-    std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
-    if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
-      // This query was not waitlisted earlier.
-      query_id_to_handle_[query_handle->query_id()] = query_handle;
-      waiting_queries_.push(query_handle);
-    }
+  // Ok to admit the query.
+  const std::size_t query_id = query_handle->query_id();
+  if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+    // Query with the same ID not present, ok to admit.
+    admitted_queries_[query_id].reset(
+        new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
+                         catalog_database_, storage_manager_, bus_));
+    std::cout << "Admitted query with ID: " << query_handle->query_id()
+               << " priority: " << query_handle->query_priority() << "\n";
+    priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
+    learner_->addQuery(*query_handle);
+    query_handle->setAdmissionTime();
+    query_id_to_handle_[query_handle->query_id()] = query_handle;
+    LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
+    return true;
+  } else {
+    LOG(ERROR) << "Query with the same ID " << query_id << " exists";
     return false;
-  }*/
+  }
 }
 
 bool PriorityPolicyEnforcer::admitSuspendedQuery(QueryHandle *query_handle) {
- if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
+ if (admissionMemoryCheck(query_handle)) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();
+    // As we deducted the current query footprint from the committed memory in
+    // suspendQuery() we need to add it back.
+    const std::size_t curr_query_footprint = getMemoryForQueryInBytes(query_id);
+    suspended_memory_ -= curr_query_footprint;
     if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
       // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
-                           catalog_database_, storage_manager_, bus_));
-      std::cout << "Admitted query with ID: " << query_handle->query_id()
+      // Don't create a new QueryManager instance, it has already been created.
+      // Just move it from suspended_query_managers_ to admitted_queries_.
+      DCHECK(suspended_query_managers_.find(query_id) != suspended_query_managers_.end());
+      DCHECK(suspended_query_managers_.at(query_id) != nullptr);
+      admitted_queries_[query_id].reset(suspended_query_managers_[query_id].release());
+      std::cout << "Admitted suspended query with ID: " << query_handle->query_id()
                  << " priority: " << query_handle->query_priority() << "\n";
       priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
-      query_handle->setAdmissionTime();
+      // query_handle->setAdmissionTime();
       query_id_to_handle_[query_handle->query_id()] = query_handle;
       LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
       return true;
@@ -130,13 +133,6 @@ bool PriorityPolicyEnforcer::admitSuspendedQuery(QueryHandle *query_handle) {
     }
   } else {
     // Let the query be in the suspended mode.
-    // This query will have to wait.
-    /*std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
-    if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
-      // This query was not waitlisted earlier.
-      query_id_to_handle_[query_handle->query_id()] = query_handle;
-      waiting_queries_.push(query_handle);
-    }*/
     return false;
   }
 }
@@ -214,23 +210,32 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
     default:
       LOG(FATAL) << "Unknown message type found in PriorityPolicyEnforcer";
   }
-  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  const QueryManager::QueryStatusCode return_code =
-      admitted_queries_[query_id]->processMessage(tagged_message);
+  QueryManager::QueryStatusCode return_code = QueryManager::QueryStatusCode::kNone;
+  if (!hasQuerySuspended(query_id)) {
+    DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+       return_code = admitted_queries_[query_id]->processMessage(tagged_message);
+  } else {
+    DCHECK(suspended_query_managers_.find(query_id) != suspended_query_managers_.end());
+    return_code = suspended_query_managers_[query_id]->processMessage(tagged_message);
+  }
   // NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
   if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
     if (!suspended_queries_.empty()) {
       // Admit a suspended query.
       QueryHandle *suspended_query = suspended_queries_.back();
-      if (admitSuspendedQuery(suspended_query)) {
+      while (admitSuspendedQuery(suspended_query)) {
+        // Until we can admit more suspended queries ...
         std::cout << "Admitting suspended query " << suspended_query->query_id() << " back\n";
         suspended_queries_.pop_back();
         suspended_query_managers_.erase(suspended_query->query_id());
-        return;
+        if (!suspended_queries_.empty()) {
+          suspended_query = suspended_queries_.back();
+        } else {
+          break;
+        }
       }
-    }
-    if (!waiting_queries_.empty()) {
+    } else if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.
       QueryHandle *new_query = waiting_queries_.front();
       // waiting_queries_.pop();
@@ -345,6 +350,7 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
   // TODO(harshad) - Admit waiting queries, if any.
+  query_id_to_handle_.erase(query_id);
   DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
 }
 
@@ -368,12 +374,14 @@ void PriorityPolicyEnforcer::suspendQuery(const std::size_t query_id) {
     // No more queries for the given priority level. Remove the entry.
     priority_query_ids_.erase(query_priority_unsigned);
   }
-  // TODO(harshad) - Support actually evicting the memory used up by the suspended query.
   const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
+  // TODO(harshad) - Support actually evicting the memory used up by the suspended query.
   committed_memory_ -= estimated_memory_bytes;
+  const std::size_t curr_query_footprint = getMemoryForQueryInBytes(query_id);
+  suspended_memory_ += curr_query_footprint;
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
-  std::cout << "Suspended query: " << query_id << " with priority: " << query_priority;
+  std::cout << "Suspended query: " << query_id << " with priority: " << query_priority << "\n";
 }
 
 bool PriorityPolicyEnforcer::admitQueries(
@@ -461,13 +469,16 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
 bool PriorityPolicyEnforcer::admissionMemoryCheck(const QueryHandle *query_handle) {
   if (admitted_queries_.empty()) {
     // No query running in the system, let the query in.
+    const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
+    committed_memory_ += estimated_memory_requirement_bytes;
     return true;
   }
   const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
   const std::size_t estimated_slots = StorageManager::SlotsNeededForBytes(estimated_memory_requirement_bytes);
-  const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize());
+  const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize() - ((suspended_memory_ > 0) ? suspended_memory_ : 0));
   const std::size_t committed_slots = StorageManager::SlotsNeededForBytes(committed_memory_);
-  if (std::max(committed_slots, current_slots) + estimated_slots < storage_manager_->getMaxBufferPoolSlots()) {
+  /*std::cout << "Requested: " << std::max(committed_slots, current_slots) + estimated_slots << " Current: " << current_slots << " Limit: " << 0.8 * float(storage_manager_->getMaxBufferPoolSlots()) << "\n";*/
+  if (std::max(committed_slots, current_slots) + estimated_slots < (0.8 * float(storage_manager_->getMaxBufferPoolSlots()))) {
     committed_memory_ += estimated_memory_requirement_bytes;
     return true;
   }
@@ -478,8 +489,11 @@ const std::size_t PriorityPolicyEnforcer::getMemoryForQueryInBytes(const std::si
   DCHECK(query_id_to_handle_.find(query_id) != query_id_to_handle_.end());
   QueryHandle *query_handle = query_id_to_handle_[query_id];
   std::size_t memory = query_handle->getMemoryTempRelationsBytes();
-  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  memory += admitted_queries_[query_id]->getMemoryBytes();
+  if (!hasQuerySuspended(query_id)) {
+    memory += admitted_queries_[query_id]->getMemoryBytes();
+  } else {
+    memory += suspended_query_managers_[query_id]->getMemoryBytes();
+  }
   return memory;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21a367f0/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 0659230..08c3ada 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -138,7 +138,7 @@ class PriorityPolicyEnforcer {
    *         the policy enforcer doesn't have any query.
    **/
   inline bool hasQueries() const {
-    return !(admitted_queries_.empty() && waiting_queries_.empty());
+    return !(admitted_queries_.empty() && waiting_queries_.empty() && suspended_query_managers_.empty());
   }
 
   /**
@@ -249,7 +249,8 @@ class PriorityPolicyEnforcer {
 
   std::unique_ptr<Learner> learner_;
 
-  std::size_t committed_memory_;
+  long committed_memory_;
+  long suspended_memory_;
 
   DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer);
 };