You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/08 13:21:50 UTC
[50/50] [abbrv] incubator-quickstep git commit: Merge branch 'master'
into query-manager-used-in-foreman
Merge branch 'master' into query-manager-used-in-foreman
Conflicts:
cli/QuickstepCli.cpp
query_execution/CMakeLists.txt
query_execution/QueryContext.proto
query_execution/QueryExecutionTypedefs.hpp
query_optimizer/ExecutionGenerator.hpp
storage/StorageManager.hpp
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bef0ae1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bef0ae1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bef0ae1d
Branch: refs/heads/query-manager-used-in-foreman
Commit: bef0ae1d83649b1d1a288f94e4be6bca28ea6037
Parents: 24c93ca 69c6551
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jun 8 08:15:49 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 8 08:15:49 2016 -0500
----------------------------------------------------------------------
CMakeLists.txt | 17 +-
DISCLAIMER | 11 +
README.md | 25 +-
catalog/CMakeLists.txt | 16 +-
catalog/Catalog.proto | 13 +-
catalog/CatalogRelation.cpp | 11 +
catalog/CatalogRelation.hpp | 52 +-
catalog/CatalogRelationStatistics.cpp | 49 +
catalog/CatalogRelationStatistics.hpp | 122 +
cli/CMakeLists.txt | 23 +-
cli/CommandExecutor.cpp | 189 +-
cli/CommandExecutor.hpp | 28 +-
cli/DefaultsConfigurator.hpp | 16 +
cli/InputParserUtil.cpp | 1 +
cli/PrintToScreen.cpp | 33 +
cli/PrintToScreen.hpp | 31 +
cli/QuickstepCli.cpp | 57 +-
cli/tests/CommandExecutorTestRunner.cpp | 5 +
cli/tests/command_executor/D.test | 35 +-
cli/tests/command_executor/Dt.test | 36 +-
.../aggregation/AggregationConcreteHandle.hpp | 105 +
expressions/aggregation/AggregationHandle.hpp | 15 +-
.../aggregation/AggregationHandleAvg.cpp | 9 +
.../aggregation/AggregationHandleAvg.hpp | 4 +
.../aggregation/AggregationHandleCount.cpp | 11 +
.../aggregation/AggregationHandleCount.hpp | 4 +
.../aggregation/AggregationHandleDistinct.hpp | 7 +
.../aggregation/AggregationHandleMax.cpp | 9 +
.../aggregation/AggregationHandleMax.hpp | 4 +
.../aggregation/AggregationHandleMin.cpp | 9 +
.../aggregation/AggregationHandleMin.hpp | 4 +
.../aggregation/AggregationHandleSum.cpp | 11 +-
.../aggregation/AggregationHandleSum.hpp | 4 +
expressions/aggregation/CMakeLists.txt | 2 +
.../tests/AggregationHandleAvg_unittest.cpp | 109 +
.../tests/AggregationHandleCount_unittest.cpp | 126 +-
.../tests/AggregationHandleMax_unittest.cpp | 122 +
.../tests/AggregationHandleMin_unittest.cpp | 121 +
.../tests/AggregationHandleSum_unittest.cpp | 124 +
parser/ParseBasicExpressions.cpp | 31 +
parser/ParseBasicExpressions.hpp | 77 +
parser/ParseExpression.hpp | 1 +
parser/ParseIndexProperties.hpp | 2 +-
parser/SqlLexer.lpp | 2 +
parser/SqlParser.ypp | 16 +
parser/preprocessed/SqlLexer_gen.cpp | 1173 +++----
parser/preprocessed/SqlLexer_gen.hpp | 2 +-
parser/preprocessed/SqlParser_gen.cpp | 2920 +++++++++---------
parser/preprocessed/SqlParser_gen.hpp | 126 +-
parser/tests/Index.test | 11 +
query_execution/AdmitRequestMessage.hpp | 2 +
query_execution/BlockLocator.cpp | 223 ++
query_execution/BlockLocator.hpp | 125 +
query_execution/CMakeLists.txt | 51 +
query_execution/Foreman.cpp | 3 +
query_execution/PolicyEnforcer.cpp | 9 +-
query_execution/QueryContext.cpp | 16 +-
query_execution/QueryContext.hpp | 57 +-
query_execution/QueryContext.proto | 20 +-
query_execution/QueryExecutionMessages.proto | 34 +
query_execution/QueryExecutionTypedefs.hpp | 20 +-
query_execution/QueryExecutionUtil.hpp | 45 +-
query_execution/tests/BlockLocator_unittest.cpp | 270 ++
query_optimizer/CMakeLists.txt | 16 +-
query_optimizer/ExecutionGenerator.cpp | 139 +-
query_optimizer/ExecutionGenerator.hpp | 15 +-
query_optimizer/ExecutionHeuristics.cpp | 127 +
query_optimizer/ExecutionHeuristics.hpp | 155 +
query_optimizer/PhysicalGenerator.cpp | 25 +
query_optimizer/QueryProcessor.hpp | 10 +
query_optimizer/cost_model/CMakeLists.txt | 33 +-
query_optimizer/cost_model/SimpleCostModel.cpp | 9 +
query_optimizer/cost_model/SimpleCostModel.hpp | 8 +-
.../cost_model/StarSchemaSimpleCostModel.cpp | 258 ++
.../cost_model/StarSchemaSimpleCostModel.hpp | 115 +
.../logical/SharedSubplanReference.cpp | 7 +-
.../logical/SharedSubplanReference.hpp | 34 +-
query_optimizer/physical/Aggregate.cpp | 14 +
.../physical/SharedSubplanReference.cpp | 5 +
.../physical/SharedSubplanReference.hpp | 34 +-
query_optimizer/resolver/CMakeLists.txt | 1 +
query_optimizer/resolver/Resolver.cpp | 84 +-
query_optimizer/rules/CMakeLists.txt | 20 +-
query_optimizer/rules/RuleHelper.cpp | 18 +-
.../StarSchemaHashJoinOrderOptimization.cpp | 309 ++
.../StarSchemaHashJoinOrderOptimization.hpp | 136 +
query_optimizer/strategy/OneToOne.cpp | 1 +
query_optimizer/tests/CMakeLists.txt | 17 +
.../tests/ExecutionGeneratorTestRunner.cpp | 17 +-
.../tests/ExecutionHeuristics_unittest.cpp | 301 ++
query_optimizer/tests/OptimizerTextTest.cpp | 14 +
.../tests/execution_generator/CMakeLists.txt | 2 +-
.../tests/execution_generator/Select.test | 101 +-
.../tests/logical_generator/Create.test | 4 +-
.../tests/logical_generator/Select.test | 106 +-
.../tests/physical_generator/Create.test | 6 +-
.../tests/physical_generator/Select.test | 181 +-
query_optimizer/tests/resolver/Create.test | 28 +-
query_optimizer/tests/resolver/Select.test | 259 +-
relational_operators/CMakeLists.txt | 8 +
relational_operators/HashJoinOperator.cpp | 7 +-
relational_operators/SelectOperator.cpp | 129 +-
relational_operators/SelectOperator.hpp | 115 +-
relational_operators/WorkOrder.proto | 98 +-
relational_operators/WorkOrderFactory.cpp | 362 +--
storage/AggregationOperationState.cpp | 112 +-
storage/AggregationOperationState.hpp | 9 +
storage/AggregationOperationState.proto | 5 +
storage/CMakeLists.txt | 116 +-
storage/DataExchange.proto | 31 +
storage/DataExchangerAsync.cpp | 165 +
storage/DataExchangerAsync.hpp | 97 +
storage/EvictionPolicy.hpp | 6 +
storage/FileManager.hpp | 9 +-
storage/FileManagerHdfs.cpp | 59 +-
storage/FileManagerPosix.cpp | 55 +-
storage/FileManagerPosix.hpp | 6 +-
storage/FileManagerWindows.cpp | 55 +-
storage/FileManagerWindows.hpp | 4 +-
storage/HashTable.hpp | 104 +
storage/HashTable.proto | 8 +
storage/HashTableFactory.hpp | 44 +-
storage/HashTablePool.hpp | 166 +
storage/PreloaderThread.cpp | 98 +-
storage/PreloaderThread.hpp | 30 +
storage/SMAIndexSubBlock.cpp | 48 +-
storage/SMAIndexSubBlock.hpp | 3 +-
.../SimpleScalarSeparateChainingHashTable.hpp | 10 +-
storage/StorageBlock.cpp | 5 +
storage/StorageBlock.hpp | 11 +-
storage/StorageConstants.hpp | 15 +-
storage/StorageManager.cpp | 473 ++-
storage/StorageManager.hpp | 182 +-
storage/tests/DataExchange_unittest.cpp | 240 ++
storage/tests/SMAIndexSubBlock_unittest.cpp | 52 +
storage/tests/StorageManager_unittest.cpp | 42 +
third_party/iwyu/iwyu_helper.py | 8 +-
transaction/AccessMode.hpp | 76 +-
transaction/CMakeLists.txt | 59 +-
transaction/CycleDetector.cpp | 120 +
transaction/CycleDetector.hpp | 83 +
transaction/DeadLockDetector.cpp | 177 ++
transaction/DeadLockDetector.hpp | 156 +
transaction/DirectedGraph.hpp | 56 +-
transaction/LockManager.cpp | 237 ++
transaction/LockManager.hpp | 128 +
transaction/LockTable.cpp | 22 +-
transaction/LockTable.hpp | 36 +-
transaction/ResourceId.hpp | 9 +-
transaction/StronglyConnectedComponents.cpp | 1 -
transaction/Transaction.cpp | 48 -
transaction/TransactionTable.cpp | 8 +-
transaction/TransactionTable.hpp | 11 +-
transaction/tests/AccessMode_unittest.cpp | 12 +-
transaction/tests/CycleDetector_unittest.cpp | 157 +
transaction/tests/DeadLockDetector_unittest.cpp | 96 +
transaction/tests/DirectedGraph_unittest.cpp | 113 +-
transaction/tests/LockRequest_unittest.cpp | 4 +-
transaction/tests/LockTable_unittest.cpp | 63 +-
transaction/tests/Lock_unittest.cpp | 13 +-
.../StronglyConnectedComponents_unittest.cpp | 15 +-
transaction/tests/TransactionTable_unittest.cpp | 102 +-
types/operations/Operation.proto | 9 +
.../operations/unary_operations/CMakeLists.txt | 22 +
.../unary_operations/SubstringOperation.cpp | 214 ++
.../unary_operations/SubstringOperation.hpp | 234 ++
.../unary_operations/UnaryOperation.cpp | 4 +
.../unary_operations/UnaryOperationFactory.cpp | 12 +
.../unary_operations/UnaryOperationID.cpp | 6 +-
.../unary_operations/UnaryOperationID.hpp | 3 +
utility/BloomFilter.hpp | 198 +-
utility/BloomFilter.proto | 30 +
utility/CMakeLists.txt | 31 +
utility/PlanVisualizer.cpp | 161 +
utility/PlanVisualizer.hpp | 94 +
utility/ScopedBuffer.hpp | 19 +-
utility/ShardedLockManager.hpp | 71 +-
utility/StringUtil.cpp | 14 +
utility/StringUtil.hpp | 48 +-
validate_cmakelists.py | 5 +-
180 files changed, 12093 insertions(+), 3286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/CMakeLists.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --cc cli/CommandExecutor.cpp
index f38121f,ddcd38f..cfb8c5b
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@@ -28,15 -29,28 +29,30 @@@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogRelationSchema.hpp"
+ #include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
+ #include "parser/ParseString.hpp"
+ #include "parser/SqlParserWrapper.hpp"
+ #include "query_execution/Foreman.hpp"
+ #include "query_optimizer/QueryHandle.hpp"
+ #include "query_optimizer/QueryPlan.hpp"
+ #include "query_optimizer/QueryProcessor.hpp"
+ #include "storage/StorageBlock.hpp"
+ #include "storage/StorageBlockInfo.hpp"
+ #include "storage/StorageManager.hpp"
+ #include "storage/TupleIdSequence.hpp"
+ #include "storage/TupleStorageSubBlock.hpp"
+ #include "types/Type.hpp"
+ #include "types/TypeID.hpp"
+ #include "types/TypedValue.hpp"
#include "utility/PtrVector.hpp"
- #include "utility/Macros.hpp"
#include "utility/SqlError.hpp"
- #include "gflags/gflags.h"
#include "glog/logging.h"
++#include "tmb/id_typedefs.h"
++
using std::fprintf;
using std::fputc;
using std::fputs;
@@@ -44,6 -58,6 +60,8 @@@ using std::size_t
using std::string;
using std::vector;
++namespace tmb { class MessageBus; }
++
namespace quickstep {
namespace cli {
namespace {
@@@ -162,10 -191,130 +195,143 @@@ void executeDescribeTable
}
}
+ /**
+ * @brief A helper function that executes a SQL query to obtain a scalar result.
+ */
-inline TypedValue executeQueryForSingleResult(const std::string &query_string,
- StorageManager *storage_manager,
- QueryProcessor *query_processor,
- SqlParserWrapper *parser_wrapper,
- Foreman *foreman) {
++inline TypedValue executeQueryForSingleResult(
++ const tmb::client_id main_thread_client_id,
++ const tmb::client_id foreman_client_id,
++ const std::string &query_string,
++ tmb::MessageBus *bus,
++ StorageManager *storage_manager,
++ QueryProcessor *query_processor,
++ SqlParserWrapper *parser_wrapper) {
+ parser_wrapper->feedNextBuffer(new std::string(query_string));
+
+ ParseResult result = parser_wrapper->getNextStatement();
+ DCHECK(result.condition == ParseResult::kSuccess);
+
+ // Generate the query plan.
+ std::unique_ptr<QueryHandle> query_handle(
+ query_processor->generateQueryHandle(*result.parsed_statement));
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ // Use foreman to execute the query plan.
- foreman->setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
++ /*foreman->setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
+ foreman->reconstructQueryContextFromProto(query_handle->getQueryContextProto());
+
+ foreman->start();
- foreman->join();
++ foreman->join();*/
++ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
++ main_thread_client_id, foreman_client_id, query_handle.get(), bus);
+
+ // Retrieve the scalar result from the result relation.
+ const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+ DCHECK(query_result_relation != nullptr);
+
+ TypedValue value;
+ {
+ std::vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
+ DCHECK_EQ(1u, blocks.size());
+ BlockReference block = storage_manager->getBlock(blocks[0], *query_result_relation);
+ const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
+ DCHECK_EQ(1, tuple_store.numTuples());
+ DCHECK_EQ(1u, tuple_store.getRelation().size());
+
+ if (tuple_store.isPacked()) {
+ value = tuple_store.getAttributeValueTyped(0, 0);
+ } else {
+ std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
+ value = tuple_store.getAttributeValueTyped(*existence_map->begin(), 0);
+ }
+ value.ensureNotReference();
+ }
+
+ // Drop the result relation.
+ DropRelation::Drop(*query_result_relation,
+ query_processor->getDefaultDatabase(),
+ query_processor->getStorageManager());
+
+ return value;
+ }
+
-void executeAnalyze(QueryProcessor *query_processor,
- Foreman *foreman,
++void executeAnalyze(const tmb::client_id main_thread_client_id,
++ const tmb::client_id foreman_client_id,
++ MessageBus *bus,
++ QueryProcessor *query_processor,
+ FILE *out) {
+ const CatalogDatabase &database = *query_processor->getDefaultDatabase();
+ StorageManager *storage_manager = query_processor->getStorageManager();
+
+ std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper());
+ std::vector<std::reference_wrapper<const CatalogRelation>> relations(
+ database.begin(), database.end());
+
+ // Analyze each relation in the database.
+ for (const CatalogRelation &relation : relations) {
+ fprintf(out, "Analyzing %s ... ", relation.getName().c_str());
+ fflush(out);
+
+ CatalogRelation *mutable_relation =
+ query_processor->getDefaultDatabase()->getRelationByIdMutable(relation.getID());
+
+ // Get the number of distinct values for each column.
+ for (const CatalogAttribute &attribute : relation) {
+ std::string query_string = "SELECT COUNT(DISTINCT ";
+ query_string.append(attribute.getName());
+ query_string.append(") FROM ");
+ query_string.append(relation.getName());
+ query_string.append(";");
+
+ TypedValue num_distinct_values =
- executeQueryForSingleResult(query_string,
++ executeQueryForSingleResult(main_thread_client_id,
++ foreman_client_id,
++ query_string,
++ bus,
+ storage_manager,
+ query_processor,
- parser_wrapper.get(),
- foreman);
++ parser_wrapper.get());
+
+ DCHECK(num_distinct_values.getTypeID() == TypeID::kLong);
+ mutable_relation->getStatisticsMutable()->setNumDistinctValues(
+ attribute.getID(),
+ num_distinct_values.getLiteral<std::int64_t>());
+ }
+
+ // Get the number of tuples for the relation.
+ std::string query_string = "SELECT COUNT(*) FROM ";
+ query_string.append(relation.getName());
+ query_string.append(";");
+
+ TypedValue num_tuples =
- executeQueryForSingleResult(query_string,
++ executeQueryForSingleResult(main_thread_client_id,
++ foreman_client_id,
++ query_string,
++ bus,
+ storage_manager,
+ query_processor,
- parser_wrapper.get(),
- foreman);
++ parser_wrapper.get());
+
+ DCHECK(num_tuples.getTypeID() == TypeID::kLong);
+ mutable_relation->getStatisticsMutable()->setNumTuples(
+ num_tuples.getLiteral<std::int64_t>());
+
+ fprintf(out, "done\n");
+ fflush(out);
+ }
+ query_processor->markCatalogAltered();
+ query_processor->saveCatalog();
+ }
+
} // namespace
void executeCommand(const ParseStatement &statement,
const CatalogDatabase &catalog_database,
++ const tmb::client_id main_thread_client_id,
++ const tmb::client_id foreman_client_id,
++ MessageBus *bus,
+ StorageManager *storage_manager,
+ QueryProcessor *query_processor,
- Foreman *foreman,
FILE *out) {
const ParseCommand &command = static_cast<const ParseCommand &>(statement);
const PtrVector<ParseString> *arguments = command.arguments();
@@@ -178,6 -327,8 +344,9 @@@
} else {
executeDescribeTable(arguments, catalog_database, out);
}
+ } else if (command_str == C::kAnalyzeCommand) {
- executeAnalyze(query_processor, foreman, out);
++ executeAnalyze(
++ main_thread_client_id, foreman_client_id, bus, query_processor, out);
} else {
THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/CommandExecutor.hpp
----------------------------------------------------------------------
diff --cc cli/CommandExecutor.hpp
index 21eee6a,c819981..3435aeb
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@@ -21,13 -21,10 +21,14 @@@
#include <cstdio>
#include <string>
- #include "parser/ParseStatement.hpp"
- #include "utility/Macros.hpp"
++#include "tmb/id_typedefs.h"
+
using std::fprintf;
using std::fputc;
using std::string;
++namespace tmb { class MessageBus; }
++
namespace quickstep {
class CatalogDatabase;
@@@ -49,13 -50,19 +54,24 @@@ constexpr char kAnalyzeCommand[] = "\\a
/**
* @brief Executes the command by calling the command handler.
- *
+ *
* @param statement The parsed statement from the cli.
* @param catalog_database The catalog information about the current database.
- * @param out The stream where the output of the command has to be redirected to.
++ * @param main_thread_client_id The TMB client ID of the main thread.
++ * @param foreman_client_id The TMB client ID of the Foreman thread.
++ * @param bus A pointer to the TMB.
+ * @param storage_manager The current StorageManager.
+ * @param query_processor The query processor to generate plans for SQL queries.
+ * @param foreman The foreman to execute query plans.
+ * @param out The stream where the output of the command has to be redirected to.
*/
void executeCommand(const ParseStatement &statement,
const CatalogDatabase &catalog_database,
++ const tmb::client_id main_thread_client_id,
++ const tmb::client_id foreman_client_id,
++ tmb::MessageBus *bus,
+ StorageManager *storage_manager,
+ QueryProcessor *query_processor,
- Foreman *foreman,
FILE *out);
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --cc cli/QuickstepCli.cpp
index 66e58fe,558d6eb..6f954fe
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@@ -320,12 -323,7 +326,13 @@@ int main(int argc, char* argv[])
worker_client_ids,
worker_numa_nodes);
- foreman.setWorkerDirectory(&worker_directory);
+ Foreman foreman(main_thread_client_id,
+ &worker_directory,
+ &bus,
+ query_processor->getDefaultDatabase(),
+ query_processor->getStorageManager(),
- num_numa_nodes_covered);
++ -1,
++ num_numa_nodes_system);
// Start the worker threads.
for (Worker &worker : workers) {
@@@ -365,7 -365,11 +376,13 @@@
try {
quickstep::cli::executeCommand(
*result.parsed_statement,
- *(query_processor->getDefaultDatabase()), stdout);
+ *(query_processor->getDefaultDatabase()),
++ main_thread_client_id,
++ foreman.getBusClientID(),
++ &bus,
+ query_processor->getStorageManager(),
+ query_processor.get(),
- &foreman,
+ stdout);
} catch (const quickstep::SqlError &sql_error) {
fprintf(stderr, "%s",
sql_error.formatMessage(*command_string).c_str());
@@@ -385,20 -389,14 +402,25 @@@
}
DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- AdmitRequestMessage request_message(query_handle.get());
- TaggedMessage admit_tagged_message(
- &request_message, sizeof(request_message), kAdmitRequestMessage);
- foreman.setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
-
- foreman.reconstructQueryContextFromProto(query_handle->getQueryContextProto());
--
++ // TODO(harshad) - In the future when queries are not admitted
++ // immediately, calculate their waiting time separately.
++ LOG(INFO) << "Address of query handle in QuickstepCli: " << query_handle.get();
+ start = std::chrono::steady_clock::now();
- QueryExecutionUtil::SendTMBMessage(&bus,
- main_thread_client_id,
- foreman.getBusClientID(),
- std::move(admit_tagged_message));
++ const tmb::MessageBus::SendStatus send_status =
++ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
++ main_thread_client_id,
++ foreman.getBusClientID(),
++ query_handle.get(),
++ &bus);
++ if (send_status != tmb::MessageBus::SendStatus::kOK) {
++ fprintf(stderr, "\nQuery %s could not be admitted to the system\n", command_string->c_str());
++ continue;
++ }
try {
- start = std::chrono::steady_clock::now();
- foreman.start();
- foreman.join();
+ const AnnotatedMessage annotated_msg =
+ bus.Receive(main_thread_client_id, 0, true);
+ const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+ DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
end = std::chrono::steady_clock::now();
const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --cc cli/tests/CommandExecutorTestRunner.cpp
index f5c37c3,9cd493e..794f7e1
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@@ -92,6 -87,9 +92,11 @@@ void CommandExecutorTestRunner::runTest
quickstep::cli::executeCommand(
*result.parsed_statement,
*(test_database_loader_.catalog_database()),
++ main_thread_client_id_,
++ foreman_->getBusClientID(),
++ &bus_,
+ test_database_loader_.storage_manager(),
+ nullptr,
- nullptr,
output_stream.file());
} else {
QueryHandle query_handle(optimizer_context.query_id());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --cc query_execution/AdmitRequestMessage.hpp
index e33b354,0000000..e2a1077
mode 100644,000000..100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@@ -1,73 -1,0 +1,75 @@@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
+
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class QueryHandle;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A message requesting a query or queries to be admitted to the system.
+ **/
+class AdmitRequestMessage {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_handles The handles of the queries requesting to be admitted
+ * to the system.
+ **/
+ explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles)
+ : query_handles_(query_handles) {}
+
+ /**
+ * @brief Constructor for requesting single query admission.
+ *
+ * @param query_handle The handle of the query requesting to be admitted.
+ **/
+ explicit AdmitRequestMessage(QueryHandle *query_handle) {
+ query_handles_.push_back(query_handle);
+ }
+
+ /**
+ * @brief Get the query handles from this message.
+ **/
+ const std::vector<QueryHandle*>& getQueryHandles() const {
++ LOG(INFO) << "Query handle in getQueryHandles(): " << query_handles_.front()
++ << " [0] " << query_handles_[0];
+ return query_handles_;
+ }
+
+ private:
+ std::vector<QueryHandle*> query_handles_;
+
+ DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --cc query_execution/CMakeLists.txt
index 9658c08,7d9d601..c8c8161
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@@ -20,10 -20,11 +20,13 @@@ QS_PROTOBUF_GENERATE_CPP(queryexecution
QueryExecutionMessages.proto)
# Declare micro-libs:
+add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
+ if (ENABLE_DISTRIBUTED)
+ add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+ endif()
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
${queryexecution_QueryContext_proto_srcs}
@@@ -42,20 -43,43 +45,33 @@@ add_library(quickstep_queryexecution_Wo
add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
# Link dependencies:
+target_link_libraries(quickstep_queryexecution_AdmitRequestMessage
+ quickstep_utility_Macros)
+ if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution_BlockLocator
+ glog
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_threading_Thread
+ quickstep_threading_ThreadUtil
+ quickstep_utility_Macros
+ tmb)
+ endif()
target_link_libraries(quickstep_queryexecution_Foreman
+ gflags_nothreads-static
glog
- gtest
- quickstep_catalog_CatalogDatabase
- quickstep_catalog_CatalogRelation
- quickstep_catalog_CatalogTypedefs
- quickstep_catalog_PartitionScheme
+ quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanLite
- quickstep_queryexecution_QueryContext
- quickstep_queryexecution_QueryExecutionMessages_proto
- quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
- quickstep_queryexecution_WorkOrdersContainer
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
- quickstep_relationaloperators_RebuildWorkOrder
- quickstep_relationaloperators_RelationalOperator
- quickstep_relationaloperators_WorkOrder
- quickstep_storage_InsertDestination
- quickstep_storage_StorageBlock
- quickstep_storage_StorageBlockInfo
quickstep_threading_ThreadUtil
- quickstep_utility_DAG
+ quickstep_utility_EqualsAnyConstant
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_queryexecution_ForemanLite
@@@ -112,8 -128,7 +130,9 @@@ target_link_libraries(quickstep_queryex
quickstep_threading_ThreadIDBasedMap
tmb)
target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
++ quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_WorkerMessage
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_queryexecution_QueryManager
@@@ -179,7 -192,67 +198,37 @@@ target_link_libraries(quickstep_queryex
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryexecution_WorkerSelectionPolicy)
+ if (ENABLE_DISTRIBUTED)
+ target_link_libraries(quickstep_queryexecution
+ quickstep_queryexecution_BlockLocator)
+ endif()
+
# Tests:
+ if (ENABLE_DISTRIBUTED)
+ add_executable(BlockLocator_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/BlockLocator_unittest.cpp")
+ target_link_libraries(BlockLocator_unittest
+ gflags_nothreads-static
+ glog
+ gtest
+ quickstep_catalog_CatalogAttribute
+ quickstep_catalog_CatalogRelation
+ quickstep_queryexecution_BlockLocator
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryexecution_QueryExecutionUtil
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlock
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ tmb
+ ${LIBS})
+ add_test(BlockLocator_unittest BlockLocator_unittest)
+ endif()
+
-add_executable(Foreman_unittest
- "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp")
-target_link_libraries(Foreman_unittest
- glog
- gtest
- gtest_main
- quickstep_catalog_CatalogDatabase
- quickstep_catalog_CatalogRelation
- quickstep_catalog_CatalogTypedefs
- quickstep_queryexecution_Foreman
- quickstep_queryexecution_QueryContext
- quickstep_queryexecution_QueryContext_proto
- quickstep_queryexecution_QueryExecutionState
- quickstep_queryexecution_QueryExecutionTypedefs
- quickstep_queryexecution_WorkOrdersContainer
- quickstep_queryexecution_WorkerDirectory
- quickstep_queryexecution_WorkerMessage
- quickstep_queryoptimizer_QueryPlan
- quickstep_relationaloperators_RelationalOperator
- quickstep_relationaloperators_WorkOrder
- quickstep_storage_InsertDestination
- quickstep_storage_InsertDestination_proto
- quickstep_storage_StorageBlock
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageManager
- quickstep_utility_DAG
- quickstep_utility_Macros
- tmb)
-add_test(Foreman_unittest Foreman_unittest)
-
add_executable(QueryManager_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
target_link_libraries(QueryManager_unittest
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --cc query_execution/Foreman.cpp
index 3609120,304c429..6cec70a
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@@ -97,40 -58,161 +97,43 @@@ void Foreman::run()
// We can pin the foreman thread to a CPU if specified.
ThreadUtil::BindToCPU(cpu_id_);
}
- initializeState();
-
- DEBUG_ASSERT(query_dag_ != nullptr);
- const dag_node_index dag_size = query_dag_->size();
-
- // Collect all the workorders from all the relational operators in the DAG.
- for (dag_node_index index = 0; index < dag_size; ++index) {
- if (checkAllBlockingDependenciesMet(index)) {
- query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
- processOperator(index, false);
- }
- }
-
- // Dispatch the WorkOrders generated so far.
- dispatchWorkerMessages(0, 0);
-}
-
-void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index,
- const size_t worker_thread_index) {
- query_exec_state_->decrementNumQueuedWorkOrders(op_index);
-
- // As the given worker finished executing a WorkOrder, decrement its number
- // of queued WorkOrders.
- workers_->decrementNumQueuedWorkOrders(worker_thread_index);
-
- // Check if new work orders are available and fetch them if so.
- fetchNormalWorkOrders(op_index);
-
- if (checkRebuildRequired(op_index)) {
- if (checkNormalExecutionOver(op_index)) {
- if (!checkRebuildInitiated(op_index)) {
- if (initiateRebuild(op_index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(op_index);
- } else {
- // Rebuild under progress.
- }
- } else if (checkRebuildOver(op_index)) {
- // Rebuild was under progress and now it is over.
- markOperatorFinished(op_index);
- }
- } else {
- // Normal execution under progress for this operator.
- }
- } else if (checkOperatorExecutionOver(op_index)) {
- // Rebuild not required for this operator and its normal execution is
- // complete.
- markOperatorFinished(op_index);
- }
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- // Process the dependent operator (of the operator whose WorkOrder
- // was just executed) for which all the dependencies have been met.
- processOperator(dependent_op_index, true);
- }
- }
-
- // Dispatch the WorkerMessages to the workers. We prefer to start the search
- // for the schedulable WorkOrders beginning from 'op_index'. The first
- // candidate worker to receive the next WorkOrder is the one that sent the
- // response message to Foreman.
- dispatchWorkerMessages(worker_thread_index, op_index);
-}
-
-void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
- const size_t worker_thread_index) {
- query_exec_state_->decrementNumRebuildWorkOrders(op_index);
- workers_->decrementNumQueuedWorkOrders(worker_thread_index);
-
- if (checkRebuildOver(op_index)) {
- markOperatorFinished(op_index);
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
-
- // Dispatch the WorkerMessages to the workers. We prefer to start the search
- // for the schedulable WorkOrders beginning from 'op_index'. The first
- // candidate worker to receive the next WorkOrder is the one that sent the
- // response message to Foreman.
- dispatchWorkerMessages(worker_thread_index, op_index);
-}
-
-void Foreman::processDataPipelineMessage(const dag_node_index op_index,
- const block_id block,
- const relation_id rel_id) {
- for (const dag_node_index consumer_index :
- output_consumers_[op_index]) {
- // Feed the streamed block to the consumer. Note that 'output_consumers_'
- // only contain those dependents of operator with index = op_index which are
- // eligible to receive streamed input.
- query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
- // Because of the streamed input just fed, check if there are any new
- // WorkOrders available and if so, fetch them.
- fetchNormalWorkOrders(consumer_index);
- }
-
- // Dispatch the WorkerMessages to the workers. We prefer to start the search
- // for the schedulable WorkOrders beginning from 'op_index'. The first
- // candidate worker to receive the next WorkOrder is the one that sent the
- // response message to Foreman.
- // TODO(zuyu): Improve the data locality for the next WorkOrder.
- dispatchWorkerMessages(0, op_index);
-}
-
-void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
- RelationalOperator *op =
- query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
- op->receiveFeedbackMessage(msg);
-}
-
-void Foreman::run() {
- // Initialize before for Foreman eventloop.
- initialize();
// Event loop
- while (!query_exec_state_->hasQueryExecutionFinished()) {
+ for (;;) {
// Receive() causes this thread to sleep until next message is received.
- AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true);
+ const AnnotatedMessage annotated_msg =
+ bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
- switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
- break;
- }
- case kRebuildWorkOrderCompleteMessage: {
- serialization::WorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
+ const tmb::message_type_id message_type = tagged_message.message_type();
+ switch (message_type) {
+ case kCatalogRelationNewBlockMessage: // Fall through
+ case kDataPipelineMessage:
+ case kRebuildWorkOrderCompleteMessage:
+ case kWorkOrderCompleteMessage:
+ case kWorkOrderFeedbackMessage:
+ case kWorkOrdersAvailableMessage: {
+ policy_enforcer_->processMessage(tagged_message);
break;
}
- case kCatalogRelationNewBlockMessage: {
- serialization::CatalogRelationNewBlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- const block_id block = proto.block_id();
-
- CatalogRelation *relation =
- static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
- relation->addBlock(block);
-
- if (proto.has_partition_id()) {
- relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block);
+ case kAdmitRequestMessage: {
+ const AdmitRequestMessage *msg =
+ static_cast<const AdmitRequestMessage *>(tagged_message.message());
+ const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+
++ LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() <<
++ " [0]: " << query_handles[0];
+ DCHECK(!query_handles.empty());
+ bool all_queries_admitted = true;
+ if (query_handles.size() == 1u) {
++ LOG(INFO) << "Address of query handle in foreman: " << query_handles.front();
+ all_queries_admitted =
+ policy_enforcer_->admitQuery(query_handles.front());
+ } else {
+ all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+ }
+ if (!all_queries_admitted) {
+ LOG(WARNING) << "The scheduler could not admit all the queries";
+ // TODO(harshad) - Inform the main thread about the failure.
}
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --cc query_execution/PolicyEnforcer.cpp
index 1ee1df9,0000000..15b90a1
mode 100644,000000..100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@@ -1,177 -1,0 +1,180 @@@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcer.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryManager.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/WorkOrder.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+ if (admitted_queries_.size() < kMaxConcurrentQueries) {
+ // Ok to admit the query.
+ const std::size_t query_id = query_handle->query_id();
+ if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
- admitted_queries_[query_id].reset(
- new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
- catalog_database_, storage_manager_, bus_));
++ admitted_queries_[query_id].reset(new QueryManager(foreman_client_id_,
++ num_numa_nodes_,
++ query_handle,
++ catalog_database_,
++ storage_manager_,
++ bus_));
+ return true;
+ } else {
+ LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+ return false;
+ }
+ } else {
+ // This query will have to wait.
+ waiting_queries_.push(query_handle);
+ return false;
+ }
+}
+
+void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
+ // TODO(harshad) : Provide processXMessage() public functions in
+ // QueryManager, so that we need to extract message from the
+ // TaggedMessage only once.
+ std::size_t query_id;
+ switch (tagged_message.message_type()) {
+ case kWorkOrderCompleteMessage: // Fall through.
+ case kRebuildWorkOrderCompleteMessage: {
+ serialization::WorkOrderCompletionMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: {
+ serialization::CatalogRelationNewBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kDataPipelineMessage: {
+ serialization::DataPipelineMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrdersAvailableMessage: {
+ serialization::WorkOrdersAvailableMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+ query_id = proto.query_id();
+ break;
+ }
+ case kWorkOrderFeedbackMessage: {
+ // TODO(harshad) Add query ID to FeedbackMessage.
+ WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
+ query_id = msg.header().query_id;
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
+ }
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ const QueryManager::QueryStatusCode return_code =
+ admitted_queries_[query_id]->processMessage(tagged_message);
+ if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+ removeQuery(query_id);
+ if (!waiting_queries_.empty()) {
+ // Admit the earliest waiting query.
+ QueryHandle *new_query = waiting_queries_.front();
+ waiting_queries_.pop();
+ admitQuery(new_query);
+ }
+ }
+}
+
+void PolicyEnforcer::getWorkerMessages(
+ std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
+ // Iterate over admitted queries until either there are no more
+ // messages available, or the maximum number of messages have
+ // been collected.
+ DCHECK(worker_messages->empty());
+ // TODO(harshad) - Make this function generic enough so that it
+ // works well when multiple queries are getting executed.
+ std::size_t per_query_share = 0;
+ if (!admitted_queries_.empty()) {
+ per_query_share = kMaxNumWorkerMessages / admitted_queries_.size();
+ } else {
+ LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+ return;
+ }
+ DCHECK_GT(per_query_share, 0u);
+ std::vector<std::size_t> finished_queries_ids;
+
+ for (const auto &admitted_query_info : admitted_queries_) {
+ QueryManager *curr_query_manager = admitted_query_info.second.get();
+ DCHECK(curr_query_manager != nullptr);
+ std::size_t messages_collected_curr_query = 0;
+ while (messages_collected_curr_query < per_query_share) {
+ WorkerMessage *next_worker_message =
+ curr_query_manager->getNextWorkerMessage(0, -1);
+ if (next_worker_message != nullptr) {
+ ++messages_collected_curr_query;
+ worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
+ } else {
+ // No more work ordes from the current query at this time.
+ // Check if the query's execution is over.
+ if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+ // If the query has been executed, remove it.
+ finished_queries_ids.push_back(admitted_query_info.first);
+ }
+ break;
+ }
+ }
+ }
+ for (std::size_t finished_qid : finished_queries_ids) {
+ removeQuery(finished_qid);
+ }
+}
+
+void PolicyEnforcer::removeQuery(const std::size_t query_id) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
+ LOG(WARNING) << "Removing query with ID " << query_id
+ << " that hasn't finished its execution";
+ }
+ admitted_queries_.erase(query_id);
+}
+
+bool PolicyEnforcer::admitQueries(
+ const std::vector<QueryHandle *> &query_handles) {
+ for (QueryHandle *curr_query : query_handles) {
+ if (!admitQuery(curr_query)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryContext.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryContext.hpp
index e170494,7d5628d..83627bc
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@@ -406,15 -460,9 +460,16 @@@ class QueryContext
return update_groups_[id];
}
+ /**
+ * @param Get the ID of this query.
+ **/
+ inline const std::size_t getQueryID() const {
+ return query_id_;
+ }
+
private:
std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
+ std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --cc query_execution/QueryContext.proto
index 5632482,b37286c..98cd0b6
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@@ -42,16 -43,15 +43,17 @@@ message QueryContext
}
repeated AggregationOperationState aggregation_states = 1;
- repeated HashTable join_hash_tables = 2;
- repeated InsertDestination insert_destinations = 3;
- repeated Predicate predicates = 4;
- repeated ScalarGroup scalar_groups = 5;
- repeated SortConfiguration sort_configs = 6;
- repeated Tuple tuples = 7;
- repeated GeneratorFunctionHandle generator_functions = 8;
+ repeated BloomFilter bloom_filters = 2;
+ repeated GeneratorFunctionHandle generator_functions = 3;
+ repeated HashTable join_hash_tables = 4;
+ repeated InsertDestination insert_destinations = 5;
+ repeated Predicate predicates = 6;
+ repeated ScalarGroup scalar_groups = 7;
+ repeated SortConfiguration sort_configs = 8;
+ repeated Tuple tuples = 9;
// NOTE(zuyu): For UpdateWorkOrder only.
- repeated UpdateGroup update_groups = 9;
+ repeated UpdateGroup update_groups = 10;
+
- required uint64 query_id = 10;
++ required uint64 query_id = 11;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionMessages.proto
index f8628fa,15803cf..9d9a9e5
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@@ -44,5 -45,34 +48,35 @@@ message DataPipelineMessage
message WorkOrdersAvailableMessage {
required uint64 operator_index = 1;
+ required uint64 query_id = 2;
}
+
+ // BlockLocator related messages.
+ message BlockDomainRegistrationMessage {
+ // Format IP:Port, i.e., "0.0.0.0:0".
+ required string domain_network_address = 1;
+ }
+
+ // Used for RegistrationResponse, Unregistration, and FailureReport.
+ message BlockDomainMessage {
+ required uint32 block_domain = 1;
+ }
+
+ // Used when StorageManager loads or evicts a block or a blob from its buffer
+ // pool.
+ message BlockLocationMessage {
+ required fixed64 block_id = 1;
+ required uint32 block_domain = 2;
+ }
+
+ message BlockMessage {
+ required fixed64 block_id = 1;
+ }
+
+ message LocateBlockResponseMessage {
+ repeated uint32 block_domains = 1;
+ }
+
+ message GetPeerDomainNetworkAddressesResponseMessage {
+ repeated string domain_network_addresses = 1;
+ }
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionTypedefs.hpp
index 6ba792e,fc253bc..fa97158
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@@ -55,7 -56,8 +56,9 @@@ using ClientIDMap = ThreadIDBasedMap<cl
'a',
'p'>;
+ // We sort the following message types in the order of a life cycle of a query.
enum QueryExecutionMessageType : message_type_id {
++ kAdmitRequestMessage, // Requesting a query (or queries) to be admitted.
kWorkOrderMessage, // From Foreman to Worker.
kWorkOrderCompleteMessage, // From Worker to Foreman.
kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman.
@@@ -65,9 -67,21 +68,22 @@@
// their corresponding RelationalOperators.
kRebuildWorkOrderMessage, // From Foreman to Worker.
kRebuildWorkOrderCompleteMessage, // From Worker to Foreman.
- kPoisonMessage, // From the CLI shell to Foreman, then from Foreman to Workers.
++ kWorkloadCompletionMessage, // From Foreman to main thread.
+ kPoisonMessage, // From the main thread to Foreman and workers.
- kAdmitRequestMessage, // Requesting a query (or queries) to be admitted.
- kWorkloadCompletionMessage // From Foreman to main thread.
+
+ #ifdef QUICKSTEP_DISTRIBUTED
+ // BlockLocator related messages, sorted in a life cycle of StorageManager
+ // with a unique block domain.
+ kBlockDomainRegistrationMessage, // From Worker to BlockLocator.
+ kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker.
+ kAddBlockLocationMessage, // From StorageManager to BlockLocator.
+ kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
+ kLocateBlockMessage, // From StorageManager to BlockLocator.
+ kLocateBlockResponseMessage, // From BlockLocator to StorageManager.
+ kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
+ kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
+ kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator.
+ #endif
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionUtil.hpp
index 0cd0a4e,a8b6a38..267bbe6
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@@ -17,11 -17,9 +17,12 @@@
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
+#include <memory>
#include <utility>
++#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkerMessage.hpp"
#include "utility/Macros.hpp"
#include "tmb/address.h"
@@@ -31,6 -29,6 +32,8 @@@
namespace quickstep {
++class QueryHandle;
++
/**
* @brief A static class for reusable methods in query_execution module.
**/
@@@ -62,27 -60,6 +65,67 @@@ class QueryExecutionUtil
std::move(tagged_message));
}
- static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
++ /**
++ * @brief Construct and send an AdmitRequestMessage from a given sender to a
++ * given recipient.
++ *
++ * @param sender_id The TMB client ID of the sender.
++ * @param receiver_id The TMB client ID of the receiver.
++ * @param query_handle The QueryHandle used in the AdmitRequestMessage.
++ * @param bus A pointer to the TMB.
++ * @param tagged_message A moved from reference to the tagged message.
++ *
++ * @return A status code indicating the result of the message delivery.
++ * The caller should ensure that the status is SendStatus::kOK.
++ **/
++ static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
++ const tmb::client_id sender_id,
++ const tmb::client_id receiver_id,
++ QueryHandle *query_handle,
++ MessageBus *bus) {
++ LOG(INFO) << "Address of query handle in QExecUtil: " << query_handle;
++ std::unique_ptr<AdmitRequestMessage> request_message(new AdmitRequestMessage(query_handle));
++ const std::vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
++ LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() << " [0]: " << query_handles[0];
++ const std::size_t size_of_request_msg = sizeof(*request_message);
++ TaggedMessage admit_tagged_message(
++ request_message.release(), size_of_request_msg, kAdmitRequestMessage);
++
++ return QueryExecutionUtil::SendTMBMessage(
++ bus, sender_id, receiver_id, std::move(admit_tagged_message));
++ }
++
++ /**
++ * @brief Broadcast a poison message from a given sender.
++ *
++ * @note This message will be received by all the clients that have registered
++ * as recipients of the poison message type.
++ *
++ * @param sender_id The TMB client ID of the sender.
++ * @param bus A pointer to the TMB.
++ **/
++ static void BroadcastPoisonMessage(const tmb::client_id sender_id,
++ tmb::MessageBus *bus) {
+ // Terminate all threads.
+ // The sender thread broadcasts poison message to the workers and foreman.
+ // Each worker dies after receiving poison message. The order of workers'
+ // death is irrelavant.
+ MessageStyle style;
+ style.Broadcast(true);
+ Address address;
+ address.All(true);
+ std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
+ TaggedMessage poison_tagged_message(poison_message.get(),
+ sizeof(*poison_message),
+ kPoisonMessage);
+
+ const tmb::MessageBus::SendStatus send_status = bus->Send(
+ sender_id, address, style, std::move(poison_tagged_message));
+ CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
+ "Broadcast poison message from sender with TMB client ID " << sender_id
+ << " failed";
+ }
+
private:
/**
* @brief Constructor. Made private to avoid instantiation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --cc query_optimizer/ExecutionGenerator.hpp
index 0056cbf,0630bca..9639cf1
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@@ -100,8 -103,8 +103,9 @@@ class ExecutionGenerator
: optimizer_context_(DCHECK_NOTNULL(optimizer_context)),
query_handle_(DCHECK_NOTNULL(query_handle)),
execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
- query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())) {
+ query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
+ execution_heuristics_(new ExecutionHeuristics()) {
+ query_context_proto_->set_query_id(query_handle->query_id());
#ifdef QUICKSTEP_DISTRIBUTED
catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
#endif
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --cc query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 930087a,56b53ba..59ba5c1
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@@ -94,20 -90,13 +94,23 @@@ void ExecutionGeneratorTestRunner::runT
physical_generator.generatePlan(
logical_generator.generatePlan(*result.parsed_statement));
execution_generator.generatePlan(physical_plan);
- foreman_->setQueryPlan(
- query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable());
- AdmitRequestMessage request_message(&query_handle);
- TaggedMessage admit_tagged_message(
- &request_message, sizeof(request_message), kAdmitRequestMessage);
- QueryExecutionUtil::SendTMBMessage(&bus_,
- main_thread_client_id_,
- foreman_->getBusClientID(),
- std::move(admit_tagged_message));
- foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto());
++ const tmb::MessageBus::SendStatus send_status =
++ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
++ main_thread_client_id_,
++ foreman_->getBusClientID(),
++ &query_handle,
++ &bus_);
++ if (send_status != tmb::MessageBus::SendStatus::kOK) {
++ std::fprintf(stderr, "\nQuery %s could not be admitted to the system\n", input.c_str());
++ continue;
++ }
- foreman_->start();
- foreman_->join();
+ // Receive workload completion message from Foreman.
+ const AnnotatedMessage annotated_msg =
+ bus_.Receive(main_thread_client_id_, 0, true);
+ const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+ DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
if (query_result_relation) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------