You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/02 23:17:52 UTC
[16/20] incubator-quickstep git commit: CLI support to admit a
workload.
CLI support to admit a workload.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c53cfc9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c53cfc9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c53cfc9d
Branch: refs/heads/scheduler++
Commit: c53cfc9da191247f5a748dbcdbb40863a8946b10
Parents: 6718adf
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jun 29 15:59:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jul 2 18:16:30 2016 -0500
----------------------------------------------------------------------
CMakeLists.txt | 2 -
cli/CMakeLists.txt | 4 +
cli/InputParserUtil.cpp | 30 +++
cli/InputParserUtil.hpp | 6 +
cli/QuickstepCli.cpp | 280 +++++++++++++++---------
query_execution/CMakeLists.txt | 1 +
query_execution/Learner.cpp | 11 +-
query_execution/Learner.hpp | 2 -
query_execution/PriorityPolicyEnforcer.cpp | 38 +++-
query_execution/PriorityPolicyEnforcer.hpp | 6 +-
query_execution/ProbabilityStore.hpp | 2 +-
11 files changed, 262 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 20e1fb9..9d3c413 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -741,10 +741,8 @@ target_link_libraries(quickstep_cli_shell
quickstep_catalog_CatalogRelation
quickstep_cli_CommandExecutor
quickstep_cli_DefaultsConfigurator
- quickstep_cli_DropRelation
quickstep_cli_InputParserUtil
quickstep_cli_LineReader
- quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
quickstep_parser_SqlParserWrapper
quickstep_queryexecution_AdmitRequestMessage
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 44ec223..a85d52c 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -113,6 +113,10 @@ if(QUICKSTEP_HAVE_LIBNUMA)
endif()
target_link_libraries(quickstep_cli_InputParserUtil
glog
+ quickstep_cli_DropRelation
+ quickstep_cli_PrintToScreen
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryProcessor
quickstep_utility_Macros
quickstep_utility_StringUtil)
if(QUICKSTEP_HAVE_LIBNUMA)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/cli/InputParserUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index 352883e..ffc997c 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -24,6 +24,10 @@
#include <vector>
#include "catalog/CatalogConfig.h"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
#include "storage/StorageConfig.h"
#include "utility/StringUtil.hpp"
@@ -36,6 +40,12 @@
using std::string;
namespace quickstep {
+ class CatalogRelation;
+ class CatalogDatabase;
+ class StorageManager;
+}
+
+namespace quickstep {
std::vector<int> InputParserUtil::ParseWorkerAffinities(
const int num_workers,
@@ -87,4 +97,24 @@ std::vector<int> InputParserUtil::GetNUMANodesForCPUs() {
return numa_nodes_of_cpus;
}
+void InputParserUtil::PrintAndDropOutputRelation(
+ QueryHandle *query_handle, QueryProcessor *query_processor) {
+ const CatalogRelation *query_result_relation =
+ query_handle->getQueryResultRelation();
+ if (query_result_relation != nullptr) {
+ PrintToScreen::PrintRelation(*query_result_relation,
+ query_processor->getStorageManager(),
+ stdout);
+ PrintToScreen::PrintOutputSize(
+ *query_result_relation,
+ query_processor->getStorageManager(),
+ stdout);
+
+ DropRelation::Drop(*query_result_relation,
+ query_processor->getDefaultDatabase(),
+ query_processor->getStorageManager());
+ }
+
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/cli/InputParserUtil.hpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.hpp b/cli/InputParserUtil.hpp
index ebb32d2..42e9804 100644
--- a/cli/InputParserUtil.hpp
+++ b/cli/InputParserUtil.hpp
@@ -24,6 +24,9 @@
namespace quickstep {
+class QueryHandle;
+class QueryProcessor;
+
/** \addtogroup CLI
* @{
*/
@@ -60,6 +63,9 @@ class InputParserUtil {
**/
static std::vector<int> GetNUMANodesForCPUs();
+ static void PrintAndDropOutputRelation(QueryHandle *query_handle,
+ QueryProcessor *query_processor);
+
private:
/**
* @brief Private constructor to disable instantiation of the class.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index d7b687e..3010ccc 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -161,6 +161,8 @@ DEFINE_bool(initialize_db, false, "If true, initialize a database.");
DEFINE_bool(print_query, false,
"Print each input query statement. This is useful when running a "
"large number of queries in a batch.");
+DEFINE_bool(accept_workload, false, "If true, accept a workload through CLI, "
+ "otherwise execute one query at a time");
DEFINE_string(profile_file_name, "",
"If nonempty, enable profiling using GOOGLE CPU Profiler, and write "
"its output to the given file name. This flag has no effect if "
@@ -378,129 +380,209 @@ int main(int argc, char* argv[]) {
#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
bool started_profiling = false;
#endif
- for (;;) {
- string *command_string = new string();
- *command_string = line_reader.getNextCommand();
- if (command_string->size() == 0) {
- delete command_string;
- break;
- }
+ if (!quickstep::FLAGS_accept_workload) {
+ for (;;) {
+ string *command_string = new string();
+ *command_string = line_reader.getNextCommand();
+ if (command_string->size() == 0) {
+ delete command_string;
+ break;
+ }
- if (quickstep::FLAGS_print_query) {
- printf("\n%s\n", command_string->c_str());
- }
+ if (quickstep::FLAGS_print_query) {
+ printf("\n%s\n", command_string->c_str());
+ }
- parser_wrapper->feedNextBuffer(command_string);
+ parser_wrapper->feedNextBuffer(command_string);
+
+ bool quitting = false;
+ // A parse error should reset the parser. This is because the thrown quickstep
+ // SqlError does not do the proper reset work of the YYABORT macro.
+ bool reset_parser = false;
+ for (;;) {
+ ParseResult result = parser_wrapper->getNextStatement();
+ if (result.condition == ParseResult::kSuccess) {
+ if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
+ quitting = true;
+ break;
+ }
- bool quitting = false;
- // A parse error should reset the parser. This is because the thrown quickstep
- // SqlError does not do the proper reset work of the YYABORT macro.
- bool reset_parser = false;
- for (;;) {
- ParseResult result = parser_wrapper->getNextStatement();
- if (result.condition == ParseResult::kSuccess) {
- if (result.parsed_statement->getStatementType() == ParseStatement::kQuit) {
- quitting = true;
- break;
- }
+ if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+ try {
+ quickstep::cli::executeCommand(
+ *result.parsed_statement,
+ *(query_processor->getDefaultDatabase()),
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ &bus,
+ query_processor->getStorageManager(),
+ query_processor.get(),
+ stdout);
+ } catch (const quickstep::SqlError &sql_error) {
+ fprintf(stderr, "%s",
+ sql_error.formatMessage(*command_string).c_str());
+ reset_parser = true;
+ break;
+ }
+ continue;
+ }
- if (result.parsed_statement->getStatementType() == ParseStatement::kCommand) {
+ std::unique_ptr<QueryHandle> query_handle;
try {
- quickstep::cli::executeCommand(
- *result.parsed_statement,
- *(query_processor->getDefaultDatabase()),
- main_thread_client_id,
- foreman.getBusClientID(),
- &bus,
- query_processor->getStorageManager(),
- query_processor.get(),
- stdout);
+ query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
} catch (const quickstep::SqlError &sql_error) {
- fprintf(stderr, "%s",
- sql_error.formatMessage(*command_string).c_str());
+ fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
reset_parser = true;
break;
}
- continue;
- }
- std::unique_ptr<QueryHandle> query_handle;
- try {
- query_handle.reset(query_processor->generateQueryHandle(*result.parsed_statement));
- } catch (const quickstep::SqlError &sql_error) {
- fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ std::vector<QueryHandle*> query_handles;
+ query_handles.push_back(query_handle.get());
+ start = std::chrono::steady_clock::now();
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ &query_handles,
+ &bus);
+
+ try {
+ QueryExecutionUtil::ReceiveQueryCompletionMessage(
+ main_thread_client_id, &bus);
+ end = std::chrono::steady_clock::now();
+
+ InputParserUtil::PrintAndDropOutputRelation(query_handle.get(), query_processor.get());
+ query_processor->saveCatalog();
+ std::chrono::duration<double, std::milli> time_ms = end - start;
+ printf("Time: %s ms\n",
+ quickstep::DoubleToStringWithSignificantDigits(
+ time_ms.count(), 3).c_str());
+ if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+ // TODO(harshad) - Allow user specified file instead of stdout.
+ foreman.printWorkOrderProfilingResults(query_handle->query_id(),
+ stdout);
+ }
+ } catch (const std::exception &e) {
+ fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+ break;
+ }
+ } else {
+ if (result.condition == ParseResult::kError) {
+ fprintf(stderr, "%s", result.error_message.c_str());
+ }
reset_parser = true;
break;
}
+#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
+ // Profile only if profile_file_name flag is set
+ if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
+ started_profiling = true;
+ ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+ }
+#endif
+ }
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- std::vector<QueryHandle*> query_handles;
- query_handles.push_back(query_handle.get());
- start = std::chrono::steady_clock::now();
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- main_thread_client_id,
- foreman.getBusClientID(),
- &query_handles,
- &bus);
-
- try {
- QueryExecutionUtil::ReceiveQueryCompletionMessage(
- main_thread_client_id, &bus);
- end = std::chrono::steady_clock::now();
-
- const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
- if (query_result_relation) {
- PrintToScreen::PrintRelation(*query_result_relation,
- query_processor->getStorageManager(),
- stdout);
- PrintToScreen::PrintOutputSize(
- *query_result_relation,
- query_processor->getStorageManager(),
- stdout);
-
- DropRelation::Drop(*query_result_relation,
- query_processor->getDefaultDatabase(),
- query_processor->getStorageManager());
+ if (quitting) {
+ break;
+ } else if (reset_parser) {
+ parser_wrapper.reset(new SqlParserWrapper());
+ reset_parser = false;
+ }
+ }
+ } else {
+ std::vector<QueryHandle*> query_handles;
+ for (;;) {
+ bool end_of_input = false;
+ // A parse error should reset the parser. This is because the thrown quickstep
+ // SqlError does not do the proper reset work of the YYABORT macro.
+ bool reset_parser = false;
+ string *command_string = new string();
+ *command_string = line_reader.getNextCommand();
+ if (command_string->size() == 0) {
+ delete command_string;
+ end_of_input = true;
+ reset_parser = true;
+ } else {
+ if (quickstep::FLAGS_print_query) {
+ printf("\n%s\n", command_string->c_str());
+ }
+ parser_wrapper->feedNextBuffer(command_string);
+ end_of_input = false;
+ reset_parser = false;
+ }
+ for (;;) {
+ ParseResult result = parser_wrapper->getNextStatement();
+ // Check if the input has ended.
+ if (end_of_input && (result.condition == ParseResult::kEndOfInput ||
+ (result.condition == ParseResult::kSuccess &&
+ result.parsed_statement->getStatementType() ==
+ ParseStatement::kQuit))) {
+ if (!query_handles.empty()) {
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ &query_handles,
+ &bus);
+ try {
+ QueryExecutionUtil::ReceiveQueryCompletionMessage(
+ main_thread_client_id, &bus);
+
+ for (std::size_t i = 0; i < query_handles.size(); ++i) {
+ InputParserUtil::PrintAndDropOutputRelation(
+ query_handles[i], query_processor.get());
+ }
+ query_processor->saveCatalog();
+ if (quickstep::FLAGS_profile_and_report_workorder_perf) {
+ // TODO(harshad) - Allow user specified file instead of stdout.
+ for (std::size_t i = 0; i < query_handles.size(); ++i) {
+ foreman.printWorkOrderProfilingResults(
+ query_handles[i]->query_id(), stdout);
+ }
+ }
+ } catch (const std::exception &e) {
+ fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
+ break;
+ }
}
-
- query_processor->saveCatalog();
- std::chrono::duration<double, std::milli> time_ms = end - start;
- printf("Time: %s ms\n",
- quickstep::DoubleToStringWithSignificantDigits(
- time_ms.count(), 3).c_str());
- if (quickstep::FLAGS_profile_and_report_workorder_perf) {
- // TODO(harshad) - Allow user specified file instead of stdout.
- foreman.printWorkOrderProfilingResults(query_handle->query_id(),
- stdout);
+ reset_parser = true;
+ break;
+ } else if (result.condition == ParseResult::kSuccess) {
+ if (result.parsed_statement->getStatementType() == ParseStatement::kSelect) {
+ std::unique_ptr<QueryHandle> query_handle;
+ try {
+ query_handle.reset(query_processor->generateQueryHandle(
+ *result.parsed_statement));
+ } catch (const quickstep::SqlError &sql_error) {
+ fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
+ reset_parser = true;
+ break;
+ }
+
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ query_handles.push_back(query_handle.release());
+ } else {
+ LOG(INFO) << "Only select queries are accepted in the workload";
}
- } catch (const std::exception &e) {
- fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
break;
- }
- } else {
- if (result.condition == ParseResult::kError) {
+ } else if (result.condition == ParseResult::kError) {
fprintf(stderr, "%s", result.error_message.c_str());
+ reset_parser = true;
+ break;
+ } else {
+ LOG(FATAL) << "Unhandled case";
+ break;
}
- reset_parser = true;
- break;
}
-#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
- // Profile only if profile_file_name flag is set
- if (!started_profiling && !quickstep::FLAGS_profile_file_name.empty()) {
- started_profiling = true;
- ProfilerStart(quickstep::FLAGS_profile_file_name.c_str());
+ if (end_of_input) {
+ break;
+ } else if (reset_parser) {
+ parser_wrapper.reset(new SqlParserWrapper());
+ reset_parser = false;
}
-#endif
- }
-
- if (quitting) {
- break;
- } else if (reset_parser) {
- parser_wrapper.reset(new SqlParserWrapper());
- reset_parser = false;
}
}
+
#ifdef QUICKSTEP_ENABLE_GOOGLE_PROFILER
if (started_profiling) {
ProfilerStop();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 104f9da..13b74e3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -242,6 +242,7 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_Learner
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_PriorityPolicyEnforcer
+ quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryexecution_QueryExecutionMessages_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index bb24baa..9801f60 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -101,11 +101,12 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
// queries.
DCHECK(mean_workorders_per_query.find(query_id) !=
mean_workorders_per_query.end());
- DCHECK_NE(mean_workorders_per_query[query_id], 0);
- current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
- query_id,
- 1 / static_cast<float>(mean_workorders_per_query[query_id]),
- denominator);
+ if (mean_workorders_per_query[query_id] != 0) {
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id,
+ 1 / static_cast<float>(mean_workorders_per_query[query_id]),
+ denominator);
+ }
} else {
// At least one of the queries has predicted time for next work order as 0.
// In such a case, we don't update the probabilities and continue to use
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 8634842..ef92db9 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -451,8 +451,6 @@ class Learner {
for (const auto &element : mean_workorder_per_query) {
if (element.second != 0) {
denominator += 1/static_cast<float>(element.second);
- /*} else {
- return 0;*/
}
}
return denominator;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index f9a741d..0a15094 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -62,6 +62,7 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
}
} else {
// This query will have to wait.
+ LOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
waiting_queries_.push(query_handle);
return false;
}
@@ -166,7 +167,7 @@ void PriorityPolicyEnforcer::getWorkerMessages(
return;
}
DCHECK_GT(per_query_share, 0u);
- std::vector<std::size_t> finished_queries_ids;
+ std::unordered_map<std::size_t, bool> finished_queries_ids;
if (learner_->hasActiveQueries()) {
// Key = priority level. Value = Whether we have already checked the
@@ -196,8 +197,8 @@ void PriorityPolicyEnforcer::getWorkerMessages(
DLOG(INFO) << "No active queries in the learner at this point.";
return;
}
- for (const std::size_t finished_qid : finished_queries_ids) {
- removeQuery(finished_qid);
+ for (auto finished_qid_pair : finished_queries_ids) {
+ removeQuery(finished_qid_pair.first);
}
}
@@ -225,6 +226,8 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
}
// Remove the query from the learner.
learner_->removeQuery(query_id);
+ LOG(INFO) << "Query " << query_id << " removed. has queries? " << hasQueries();
+ // Admit waiting queries, if any.
}
bool PriorityPolicyEnforcer::admitQueries(
@@ -251,7 +254,7 @@ void PriorityPolicyEnforcer::recordTimeForWorkOrder(
WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
const std::size_t priority_level,
- std::vector<std::size_t> *finished_queries_ids) {
+ std::unordered_map<std::size_t, bool> *finished_queries_ids) {
// Key = query ID from the given priority level, value = whether we have
// checked this query earlier.
std::unordered_map<std::size_t, bool> checked_query_ids;
@@ -262,9 +265,28 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
// No query available at this time in this priority level.
return nullptr;
} else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
- // We have already seen this query ID, try one more time.
- LOG(INFO) << "We have already seen this query, continue";
- continue;
+ // Find a query from the same priority level, but not present in the
+ // checked_query_ids map.
+ for (const std::size_t qid : priority_query_ids_[priority_level]) {
+ if (checked_query_ids.find(qid) == checked_query_ids.end() &&
+ finished_queries_ids->find(qid) == finished_queries_ids->end()) {
+ // Query not seen already.
+ QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(qid)].get();
+ DCHECK(chosen_query_manager != nullptr);
+ std::unique_ptr<WorkerMessage> next_worker_message(
+ chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
+ if (next_worker_message != nullptr) {
+ // LOG(INFO) << "Selecting a work order from query " << qid << " instead";
+ return next_worker_message.release();
+ } else {
+ // This query doesn't have any WorkerMessage right now. Mark as checked.
+ checked_query_ids[qid] = true;
+ if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ (*finished_queries_ids)[static_cast<std::size_t>(qid)] = true;
+ }
+ }
+ }
+ }
} else {
// We haven't seen this query earlier. Check if it has any schedulable
// WorkOrder.
@@ -277,7 +299,7 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
// This query doesn't have any WorkerMessage right now. Mark as checked.
checked_query_ids[chosen_query_id] = true;
if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
- finished_queries_ids->emplace_back(static_cast<std::size_t>(chosen_query_id));
+ (*finished_queries_ids)[static_cast<std::size_t>(chosen_query_id)] = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 281c066..eafb099 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -170,7 +170,7 @@ class PriorityPolicyEnforcer {
}
private:
- static constexpr std::size_t kMaxConcurrentQueries = 2;
+ static constexpr std::size_t kMaxConcurrentQueries = 100;
/**
* @brief Record the execution time for a finished WorkOrder.
@@ -188,7 +188,7 @@ class PriorityPolicyEnforcer {
*
* @param priority_level The priority level from which the query will be
* chosen.
- * @param finished_query_ids A vector of query IDs that have finished their
+ * @param finished_query_ids A map of query IDs that have finished their
* execution.
*
* @return A WorkerMessage. If no query can be chosen from this priority level,
@@ -196,7 +196,7 @@ class PriorityPolicyEnforcer {
**/
WorkerMessage *getNextWorkerMessageFromPriorityLevel(
const std::size_t priority_level,
- std::vector<std::size_t> *finished_queries_ids);
+ std::unordered_map<std::size_t, bool> *finished_queries_ids);
const tmb::client_id foreman_client_id_;
const std::size_t num_numa_nodes_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c53cfc9d/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 7278e2b..4ae085e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -286,7 +286,7 @@ class ProbabilityStore {
for (auto it = individual_probabilities_.begin();
it != individual_probabilities_.end();
++it) {
- DCHECK_LE(it->second.first, common_denominator_);
+ // DCHECK_LE(it->second.first, common_denominator_);
it->second.second = it->second.first / common_denominator_;
}
updateCumulativeProbabilities();