You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/30 20:49:55 UTC

[17/18] incubator-quickstep git commit: CLI support to admit a workload.

CLI support to admit a workload.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bfa31b0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bfa31b0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bfa31b0a

Branch: refs/heads/scheduler++
Commit: bfa31b0a22f7217e4fe93bff7ea7ccd1a2e6e4b8
Parents: 308e58a
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jun 29 15:59:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Jun 30 15:49:06 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt                             |   2 -
 cli/CMakeLists.txt                         |   4 +
 cli/InputParserUtil.cpp                    |  30 +++
 cli/InputParserUtil.hpp                    |   6 +
 cli/QuickstepCli.cpp                       | 280 +++++++++++++++---------
 query_execution/CMakeLists.txt             |   1 +
 query_execution/Learner.cpp                |  11 +-
 query_execution/Learner.hpp                |   2 -
 query_execution/PriorityPolicyEnforcer.cpp |  38 +++-
 query_execution/PriorityPolicyEnforcer.hpp |   6 +-
 query_execution/ProbabilityStore.hpp       |   2 +-
 11 files changed, 262 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 20e1fb9..9d3c413 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -741,10 +741,8 @@ target_link_libraries(quickstep_cli_shell
                       quickstep_catalog_CatalogRelation
                       quickstep_cli_CommandExecutor
                       quickstep_cli_DefaultsConfigurator
-                      quickstep_cli_DropRelation
                       quickstep_cli_InputParserUtil
                       quickstep_cli_LineReader
-                      quickstep_cli_PrintToScreen
                       quickstep_parser_ParseStatement
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryexecution_AdmitRequestMessage

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 44ec223..a85d52c 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -113,6 +113,10 @@ if(QUICKSTEP_HAVE_LIBNUMA)
 endif()
 target_link_libraries(quickstep_cli_InputParserUtil
                       glog
+                      quickstep_cli_DropRelation
+                      quickstep_cli_PrintToScreen
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_queryoptimizer_QueryProcessor
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 if(QUICKSTEP_HAVE_LIBNUMA)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/InputParserUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index 352883e..ffc997c 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -24,6 +24,10 @@
 #include <vector>
 
 #include "catalog/CatalogConfig.h"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageConfig.h"
 #include "utility/StringUtil.hpp"
 
@@ -36,6 +40,12 @@
 using std::string;
 
 namespace quickstep {
+  class CatalogRelation;
+  class CatalogDatabase;
+  class StorageManager;
+}
+
+namespace quickstep {
 
 std::vector<int> InputParserUtil::ParseWorkerAffinities(
     const int num_workers,
@@ -87,4 +97,24 @@ std::vector<int> InputParserUtil::GetNUMANodesForCPUs() {
   return numa_nodes_of_cpus;
 }
 
+void InputParserUtil::PrintAndDropOutputRelation(
+    QueryHandle *query_handle, QueryProcessor *query_processor) {
+  const CatalogRelation *query_result_relation =
+      query_handle->getQueryResultRelation();
+  if (query_result_relation != nullptr) {
+    PrintToScreen::PrintRelation(*query_result_relation,
+                                 query_processor->getStorageManager(),
+                                 stdout);
+    PrintToScreen::PrintOutputSize(
+        *query_result_relation,
+        query_processor->getStorageManager(),
+        stdout);
+
+    DropRelation::Drop(*query_result_relation,
+                       query_processor->getDefaultDatabase(),
+                       query_processor->getStorageManager());
+  }
+
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/InputParserUtil.hpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.hpp b/cli/InputParserUtil.hpp
index ebb32d2..42e9804 100644
--- a/cli/InputParserUtil.hpp
+++ b/cli/InputParserUtil.hpp
@@ -24,6 +24,9 @@
 
 namespace quickstep {
 
+class QueryHandle;
+class QueryProcessor;
+
 /** \addtogroup CLI
  *  @{
  */
@@ -60,6 +63,9 @@ class InputParserUtil {
    **/
   static std::vector<int> GetNUMANodesForCPUs();
 
+  static void PrintAndDropOutputRelation(QueryHandle *query_handle,
+                                         QueryProcessor *query_processor);
+
  private:
   /**
    * @brief Private constructor to disable instantiation of the class.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index d7b687e..3010ccc 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -161,6 +161,8 @@ DEFINE_bool(initialize_db, false, "If true, initialize a database.");
 DEFINE_bool(print_query, false,
             "Print each input query statement. This is useful when running a "
             "large number of queries in a batch.");
+DEFINE_bool(accept_workload, false, "If true, accept a workload through CLI, "
+            "otherwise execute one query at a time");
 DEFINE_string(profile_file_name, "",
               "If nonempty, enable profiling using GOOGLE CPU Profiler, and write "
               "its output to the given file name. This flag has no effect if "
@@ -378,129 +380,209 @@ int main(int argc, char* argv[]) {
 #ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
   bool started_profiling = false;
 #endif
-  for (;;) {
-    string *command_string = new string();
-    *command_string = line_reader.getNextCommand();
-    if (command_string->size() == 0) {
-      delete command_string;
-      break;
-    }
+  if (!quickstep::FLAGS_accept_workload) {
+    for (;;) {
+      string *command_string = new string();
+      *command_string = line_reader.getNextCommand();
+      if (command_string->size() == 0) {
+        delete command_string;
+        break;
+      }
 
-    if (quickstep::FLAGS_print_query) {
-      printf("\n%s\n", command_string->c_str());
-    }
+      if (quickstep::FLAGS_print_query) {
+        printf("\n%s\n", command_string->c_str());
+      }
 
-    parser_wrapper->feedNextBuffer(command_string);
+      parser_wrapper->feedNextBuffer(command_string);
+
+      bool quitting = false;
+      // A parse error should reset the parser. This is because the thrown quickstep
+      // SqlError does not do the proper reset work of the YYABORT macro.
+      bool reset_parser = false;
+      for (;;) {
+        ParseResult result = parser_wrapper->getNextStatement();
+        if (result.condition == ParseResult::kSuccess) {
+          if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
+            quitting = true;
+            break;
+          }
 
-    bool quitting = false;
-    // A parse error should reset the parser. This is because the thrown quickstep
-    // SqlError does not do the proper reset work of the YYABORT macro.
-    bool reset_parser = false;
-    for (;;) {
-      ParseResult result = parser_wrapper->getNextStatement();
-      if (result.condition == ParseResult::kSuccess) {
-        if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
-          quitting = true;
-          break;
-        }
+          if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+            try {
+              quickstep::cli::executeCommand(
+                  *result.parsed_statement,
+                  *(query_processor->getDefaultDatabase()),
+                  main_thread_client_id,
+                  foreman.getBusClientID(),
+                  &bus,
+                  query_processor->getStorageManager(),
+                  query_processor.get(),
+                  stdout);
+            } catch (const quickstep::SqlError &sql_error) {
+              fprintf(stderr, "%s",
+                      sql_error.formatMessage(*command_string).c_str());
+              reset_parser = true;
+              break;
+            }
+          continue;
+          }
 
-        if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+          std::unique_ptr<QueryHandle> query_handle;
           try {
-            quickstep::cli::executeCommand(
-                *result.parsed_statement,
-                *(query_processor->getDefaultDatabase()),
-                main_thread_client_id,
-                foreman.getBusClientID(),
-                &bus,
-                query_processor->getStorageManager(),
-                query_processor.get(),
-                stdout);
+            query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
           } catch (const quickstep::SqlError &sql_error) {
-            fprintf(stderr, "%s",
-                    sql_error.formatMessage(*command_string).c_str());
+            fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
             reset_parser = true;
             break;
           }
-        continue;
-        }
 
-        std::unique_ptr<QueryHandle> query_handle;
-        try {
-          query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
-        } catch (const quickstep::SqlError &sql_error) {
-          fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+          DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+          std::vector<QueryHandle*> query_handles;
+          query_handles.push_back(query_handle.get());
+          start = std::chrono::steady_clock::now();
+          QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+              main_thread_client_id,
+              foreman.getBusClientID(),
+              &query_handles,
+              &bus);
+
+          try {
+            QueryExecutionUtil::ReceiveQueryCompletionMessage(
+                main_thread_client_id, &bus);
+            end = std::chrono::steady_clock::now();
+
+            InputParserUtil::PrintAndDropOutputRelation(query_handle.get(), query_processor.get());
+            query_processor->saveCatalog();
+            std::chrono::duration<double, std::milli> time_ms = end - start;
+            printf("Time: %s ms\n",
+                   quickstep::DoubleToStringWithSignificantDigits(
+                       time_ms.count(), 3).c_str());
+            if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+              // TODO(harshad) - Allow user specified file instead of stdout.
+              foreman.printWorkOrderProfilingResults(query_handle->query_id(),
+                                                     stdout);
+            }
+          } catch (const std::exception &e) {
+            fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+            break;
+          }
+        } else {
+          if (result.condition == ParseResult::kError) {
+            fprintf(stderr, "%s", result.error_message.c_str());
+          }
           reset_parser = true;
           break;
         }
+#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
+        // Profile only if profile_file_name flag is set
+        if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
+          started_profiling = true;
+          ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+        }
+#endif
+      }
 
-        DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-        std::vector<QueryHandle*> query_handles;
-        query_handles.push_back(query_handle.get());
-        start = std::chrono::steady_clock::now();
-        QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-            main_thread_client_id,
-            foreman.getBusClientID(),
-            &query_handles,
-            &bus);
-
-        try {
-          QueryExecutionUtil::ReceiveQueryCompletionMessage(
-              main_thread_client_id, &bus);
-          end = std::chrono::steady_clock::now();
-
-          const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
-          if (query_result_relation) {
-            PrintToScreen::PrintRelation(*query_result_relation,
-                                         query_processor->getStorageManager(),
-                                         stdout);
-            PrintToScreen::PrintOutputSize(
-                *query_result_relation,
-                query_processor->getStorageManager(),
-                stdout);
-
-            DropRelation::Drop(*query_result_relation,
-                               query_processor->getDefaultDatabase(),
-                               query_processor->getStorageManager());
+      if (quitting) {
+        break;
+      } else if (reset_parser) {
+        parser_wrapper.reset(new SqlParserWrapper());
+        reset_parser = false;
+      }
+    }
+  } else {
+    std::vector<QueryHandle*> query_handles;
+    for (;;) {
+      bool end_of_input = false;
+      // A parse error should reset the parser. This is because the thrown quickstep
+      // SqlError does not do the proper reset work of the YYABORT macro.
+      bool reset_parser = false;
+      string *command_string = new string();
+      *command_string = line_reader.getNextCommand();
+      if (command_string->size() == 0) {
+        delete command_string;
+        end_of_input = true;
+        reset_parser = true;
+      } else {
+        if (quickstep::FLAGS_print_query) {
+          printf("\n%s\n", command_string->c_str());
+        }
+        parser_wrapper->feedNextBuffer(command_string);
+        end_of_input = false;
+        reset_parser = false;
+      }
+      for (;;) {
+        ParseResult result = parser_wrapper->getNextStatement();
+        // Check if the input has ended.
+        if (end_of_input && (result.condition == ParseResult::kEndOfInput ||
+                             (result.condition == ParseResult::kSuccess &&
+                              result.parsed_statement->getStatementType() ==
+                                  ParseStatement::kQuit))) {
+          if (!query_handles.empty()) {
+            QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+                main_thread_client_id,
+                foreman.getBusClientID(),
+                &query_handles,
+                &bus);
+            try {
+              QueryExecutionUtil::ReceiveQueryCompletionMessage(
+                  main_thread_client_id, &bus);
+
+              for (std::size_t i = 0; i < query_handles.size(); ++i) {
+                InputParserUtil::PrintAndDropOutputRelation(
+                    query_handles[i], query_processor.get());
+              }
+              query_processor->saveCatalog();
+              if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+                // TODO(harshad) - Allow user specified file instead of stdout.
+                for (std::size_t i = 0; i < query_handles.size(); ++i) {
+                  foreman.printWorkOrderProfilingResults(
+                      query_handles[i]->query_id(), stdout);
+                }
+              }
+            } catch (const std::exception &e) {
+              fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+              break;
+            }
           }
-
-          query_processor->saveCatalog();
-          std::chrono::duration<double, std::milli> time_ms = end - start;
-          printf("Time: %s ms\n",
-                 quickstep::DoubleToStringWithSignificantDigits(
-                     time_ms.count(), 3).c_str());
-          if (quickstep::FLAGS_profile_and_report_workorder_perf) {
-            // TODO(harshad) - Allow user specified file instead of stdout.
-            foreman.printWorkOrderProfilingResults(query_handle->query_id(),
-                                                   stdout);
+          reset_parser = true;
+          break;
+        } else if (result.condition == ParseResult::kSuccess) {
+          if (result.parsed_statement->getStatementType() == ParseStatement::kSelect) {
+            std::unique_ptr<QueryHandle> query_handle;
+            try {
+              query_handle.reset(query_processor->generateQueryHandle(
+                  *result.parsed_statement));
+            } catch (const quickstep::SqlError &sql_error) {
+              fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+              reset_parser = true;
+              break;
+            }
+
+            DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+            query_handles.push_back(query_handle.release());
+          } else {
+            LOG(INFO) << "Only select queries are accepted in the workload";
           }
-        } catch (const std::exception &e) {
-          fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
           break;
-        }
-      } else {
-        if (result.condition == ParseResult::kError) {
+        } else if (result.condition == ParseResult::kError) {
           fprintf(stderr, "%s", result.error_message.c_str());
+          reset_parser = true;
+          break;
+        } else {
+          LOG(FATAL) << "Unhandled case";
+          break;
         }
-        reset_parser = true;
-        break;
       }
-#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
-      // Profile only if profile_file_name flag is set
-      if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
-        started_profiling = true;
-        ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+      if (end_of_input) {
+        break;
+      } else if (reset_parser) {
+        parser_wrapper.reset(new SqlParserWrapper());
+        reset_parser = false;
       }
-#endif
-    }
-
-    if (quitting) {
-      break;
-    } else if (reset_parser) {
-      parser_wrapper.reset(new SqlParserWrapper());
-      reset_parser = false;
     }
   }
 
+
 #ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
   if (started_profiling) {
     ProfilerStop();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 104f9da..13b74e3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -242,6 +242,7 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_Learner
                       quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_PriorityPolicyEnforcer
+                      quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryexecution_QueryExecutionMessages_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index bb24baa..9801f60 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -101,11 +101,12 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     // queries.
     DCHECK(mean_workorders_per_query.find(query_id) !=
            mean_workorders_per_query.end());
-    DCHECK_NE(mean_workorders_per_query[query_id], 0);
-    current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
-        query_id,
-        1 / static_cast<float>(mean_workorders_per_query[query_id]),
-        denominator);
+    if (mean_workorders_per_query[query_id] != 0) {
+      current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+          query_id,
+          1 / static_cast<float>(mean_workorders_per_query[query_id]),
+          denominator);
+    }
   } else {
     // At least one of the queries has predicted time for next work order as 0.
     // In such a case, we don't update the probabilities and continue to use

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 8634842..ef92db9 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -451,8 +451,6 @@ class Learner {
     for (const auto &element : mean_workorder_per_query) {
       if (element.second != 0) {
         denominator += 1/static_cast<float>(element.second);
-      /*} else {
-        return 0;*/
       }
     }
     return denominator;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index f9a741d..0a15094 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -62,6 +62,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
     }
   } else {
     // This query will have to wait.
+    LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
     waiting_queries_.push(query_handle);
     return false;
   }
@@ -166,7 +167,7 @@ void PriorityPolicyEnforcer::getWorkerMessages(
     return;
   }
   DCHECK_GT(per_query_share, 0u);
-  std::vector<std::size_t> finished_queries_ids;
+  std::unordered_map<std::size_t, bool> finished_queries_ids;
 
   if (learner_->hasActiveQueries()) {
     // Key = priority level. Value = Whether we have already checked the
@@ -196,8 +197,8 @@ void PriorityPolicyEnforcer::getWorkerMessages(
     DLOG(INFO) << "No active queries in the learner at this point.";
     return;
   }
-  for (const std::size_t finished_qid : finished_queries_ids) {
-    removeQuery(finished_qid);
+  for (auto finished_qid_pair : finished_queries_ids) {
+    removeQuery(finished_qid_pair.first);
   }
 }
 
@@ -225,6 +226,8 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
   }
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
+  LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
+  // Admit waiting queries, if any.
 }
 
 bool PriorityPolicyEnforcer::admitQueries(
@@ -251,7 +254,7 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
 
 WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
     const std::size_t priority_level,
-    std::vector<std::size_t> *finished_queries_ids) {
+    std::unordered_map<std::size_t, bool> *finished_queries_ids) {
   // Key = query ID from the given priority level, value = whether we have
   // checked this query earlier.
   std::unordered_map<std::size_t, bool> checked_query_ids;
@@ -262,9 +265,28 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
       // No query available at this time in this priority level.
       return nullptr;
     } else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
-      // We have already seen this query ID, try one more time.
-      LOG(INFO) << "We have already seen this query, continue";
-      continue;
+      // Find a query from the same priority level, but not present in the
+      // checked_query_ids map.
+      for (const std::size_t qid : priority_query_ids_[priority_level]) {
+        if (checked_query_ids.find(qid) == checked_query_ids.end() &&
+            finished_queries_ids->find(qid) == finished_queries_ids->end()) {
+          // Query not seen already.
+          QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(qid)].get();
+          DCHECK(chosen_query_manager != nullptr);
+          std::unique_ptr<WorkerMessage> next_worker_message(
+              chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+          if (next_worker_message != nullptr) {
+            // LOG(INFO) << "Selecting a work order from query " << qid << " instead";
+            return next_worker_message.release();
+          } else {
+            // This query doesn't have any WorkerMessage right now. Mark as checked.
+            checked_query_ids[qid] = true;
+            if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+              (*finished_queries_ids)[static_cast<std::size_t>(qid)] = true;
+            }
+          }
+        }
+      }
     } else {
       // We haven't seen this query earlier. Check if it has any schedulable
       // WorkOrder.
@@ -277,7 +299,7 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
         // This query doesn't have any WorkerMessage right now. Mark as checked.
         checked_query_ids[chosen_query_id] = true;
         if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-          finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+          (*finished_queries_ids)[static_cast<std::size_t>(chosen_query_id)] = true;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 281c066..eafb099 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -170,7 +170,7 @@ class PriorityPolicyEnforcer {
   }
 
  private:
-  static constexpr std::size_t kMaxConcurrentQueries = 2;
+  static constexpr std::size_t kMaxConcurrentQueries = 100;
 
   /**
    * @brief Record the execution time for a finished WorkOrder.
@@ -188,7 +188,7 @@ class PriorityPolicyEnforcer {
    *
    * @param priority_level The priority level from which the query will be
    *        chosen.
-   * @param finished_query_ids A vector of query IDs that have finished their
+   * @param finished_query_ids A map of query IDs that have finished their
    *        execution.
    *
    * @return A WorkerMessage. If no query can be chosen from this priority level,
@@ -196,7 +196,7 @@ class PriorityPolicyEnforcer {
    **/
   WorkerMessage *getNextWorkerMessageFromPriorityLevel(
       const std::size_t priority_level,
-      std::vector<std::size_t> *finished_queries_ids);
+      std::unordered_map<std::size_t, bool> *finished_queries_ids);
 
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bfa31b0a/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 7278e2b..4ae085e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -286,7 +286,7 @@ class ProbabilityStore {
     for (auto it = individual_probabilities_.begin();
          it != individual_probabilities_.end();
          ++it) {
-      DCHECK_LE(it->second.first, common_denominator_);
+      // DCHECK_LE(it->second.first, common_denominator_);
       it->second.second = it->second.first / common_denominator_;
     }
     updateCumulativeProbabilities();