You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:30 UTC
[38/50] incubator-quickstep git commit: Transfered the ownership of
QueryHandle to QueryManager.
Transfered the ownership of QueryHandle to QueryManager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c608e99e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c608e99e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c608e99e
Branch: refs/heads/quickstep_partition_parser_support
Commit: c608e99eff883afacafa2f0fefce99a0513f9963
Parents: 57730e4
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 20 12:04:24 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Nov 27 18:00:34 2016 -0800
----------------------------------------------------------------------
cli/CommandExecutor.cpp | 30 +++++++------
cli/QuickstepCli.cpp | 46 ++++++++++----------
cli/tests/CommandExecutorTestRunner.cpp | 31 +++++++------
query_execution/QueryManagerBase.hpp | 7 ++-
.../tests/QueryManagerSingleNode_unittest.cpp | 8 ++--
.../DistributedExecutionGeneratorTestRunner.cpp | 34 ++++++++-------
.../tests/ExecutionGeneratorTestRunner.cpp | 35 ++++++++-------
7 files changed, 99 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 4ab32de..3c510e7 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -213,25 +213,27 @@ inline std::vector<TypedValue> executeQueryForSingleRow(
DCHECK(result.condition == ParseResult::kSuccess);
const ParseStatement &statement = *result.parsed_statement;
+ const CatalogRelation *query_result_relation = nullptr;
- // Generate the query plan.
- std::unique_ptr<QueryHandle> query_handle(
- std::make_unique<QueryHandle>(query_processor->query_id(),
- main_thread_client_id,
- statement.getPriority()));
- query_processor->generateQueryHandle(statement, query_handle.get());
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-
- // Use foreman to execute the query plan.
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- main_thread_client_id, foreman_client_id, query_handle.get(), bus);
+ {
+ // Generate the query plan.
+ auto query_handle =
+ std::make_unique<QueryHandle>(query_processor->query_id(),
+ main_thread_client_id,
+ statement.getPriority());
+ query_processor->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ query_result_relation = query_handle->getQueryResultRelation();
+ DCHECK(query_result_relation != nullptr);
+
+ // Use foreman to execute the query plan.
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id, foreman_client_id, query_handle.release(), bus);
+ }
QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id, bus);
// Retrieve the scalar result from the result relation.
- const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
- DCHECK(query_result_relation != nullptr);
-
std::vector<TypedValue> values;
{
std::vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index f4816a8..9db7577 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -341,38 +341,41 @@ int main(int argc, char* argv[]) {
continue;
}
- std::unique_ptr<QueryHandle> query_handle(
- std::make_unique<QueryHandle>(query_processor->query_id(),
- main_thread_client_id,
- statement.getPriority()));
+ const std::size_t query_id = query_processor->query_id();
+ const CatalogRelation *query_result_relation = nullptr;
+ std::unique_ptr<quickstep::ExecutionDAGVisualizer> dag_visualizer;
+
try {
+ auto query_handle = std::make_unique<QueryHandle>(query_id,
+ main_thread_client_id,
+ statement.getPriority());
query_processor->generateQueryHandle(statement, query_handle.get());
+ DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+ if (quickstep::FLAGS_visualize_execution_dag) {
+ dag_visualizer =
+ std::make_unique<quickstep::ExecutionDAGVisualizer>(*query_handle->getQueryPlanMutable());
+ }
+
+ query_result_relation = query_handle->getQueryResultRelation();
+
+ start = std::chrono::steady_clock::now();
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id,
+ foreman.getBusClientID(),
+ query_handle.release(),
+ &bus);
} catch (const quickstep::SqlError &sql_error) {
fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str());
reset_parser = true;
break;
}
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- std::unique_ptr<quickstep::ExecutionDAGVisualizer> dag_visualizer;
- if (quickstep::FLAGS_visualize_execution_dag) {
- dag_visualizer.reset(
- new quickstep::ExecutionDAGVisualizer(*query_handle->getQueryPlanMutable()));
- }
-
- start = std::chrono::steady_clock::now();
- QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
- main_thread_client_id,
- foreman.getBusClientID(),
- query_handle.get(),
- &bus);
-
try {
QueryExecutionUtil::ReceiveQueryCompletionMessage(
main_thread_client_id, &bus);
end = std::chrono::steady_clock::now();
- const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
&storage_manager,
@@ -394,12 +397,11 @@ int main(int argc, char* argv[]) {
time_ms.count(), 3).c_str());
if (quickstep::FLAGS_profile_and_report_workorder_perf) {
// TODO(harshad) - Allow user specified file instead of stdout.
- foreman.printWorkOrderProfilingResults(query_handle->query_id(),
- stdout);
+ foreman.printWorkOrderProfilingResults(query_id, stdout);
}
if (quickstep::FLAGS_visualize_execution_dag) {
const auto &profiling_stats =
- foreman.getWorkOrderProfilingResults(query_handle->query_id());
+ foreman.getWorkOrderProfilingResults(query_id);
dag_visualizer->bindProfilingStats(profiling_stats);
std::cerr << "\n" << dag_visualizer->toDOT() << "\n";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp
index 41cc9da..e352b24 100644
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@ -20,6 +20,7 @@
#include "cli/tests/CommandExecutorTestRunner.hpp"
#include <cstdio>
+#include <memory>
#include <set>
#include <string>
#include <utility>
@@ -88,28 +89,26 @@ void CommandExecutorTestRunner::runTestCase(
nullptr,
output_stream.file());
} else {
- QueryHandle query_handle(0 /* query_id */, main_thread_client_id_);
- O::OptimizerContext optimizer_context;
-
- optimizer_.generateQueryHandle(parse_statement,
- test_database_loader_.catalog_database(),
- &optimizer_context,
- &query_handle);
-
- AdmitRequestMessage request_message(&query_handle);
- TaggedMessage admit_tagged_message(
- &request_message, sizeof(request_message), kAdmitRequestMessage);
- QueryExecutionUtil::SendTMBMessage(&bus_,
- main_thread_client_id_,
- foreman_->getBusClientID(),
- std::move(admit_tagged_message));
+ const CatalogRelation *query_result_relation = nullptr;
+ {
+ auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_);
+ O::OptimizerContext optimizer_context;
+
+ optimizer_.generateQueryHandle(parse_statement,
+ test_database_loader_.catalog_database(),
+ &optimizer_context,
+ query_handle.get());
+ query_result_relation = query_handle->getQueryResultRelation();
+
+ QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+ main_thread_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+ }
// Receive workload completion message from Foreman.
const AnnotatedMessage annotated_msg =
bus_.Receive(main_thread_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
- const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
test_database_loader_.storage_manager(),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index a274742..ddb76d5 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -26,6 +26,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionState.hpp"
+#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -34,8 +35,6 @@
namespace quickstep {
-class QueryHandle;
-
/** \addtogroup QueryExecution
* @{
*/
@@ -77,7 +76,7 @@ class QueryManagerBase {
* @brief Get the query handle.
**/
const QueryHandle* query_handle() const {
- return query_handle_;
+ return query_handle_.get();
}
/**
@@ -259,7 +258,7 @@ class QueryManagerBase {
return query_exec_state_->hasRebuildInitiated(index);
}
- const QueryHandle *query_handle_;
+ std::unique_ptr<QueryHandle> query_handle_;
const std::size_t query_id_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index f65ec53..6ec6521 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -234,14 +234,14 @@ class QueryManagerTest : public ::testing::Test {
db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
storage_manager_.reset(new StorageManager("./"));
bus_.Initialize();
- query_handle_.reset(new QueryHandle(0 /* dummy query ID */, tmb::kClientIdNone /* cli_id */));
+ query_handle_ = new QueryHandle(0 /* dummy query ID */, tmb::kClientIdNone /* cli_id */);
query_plan_ = query_handle_->getQueryPlanMutable();
query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
}
inline void constructQueryManager() {
query_manager_.reset(new QueryManagerSingleNode(
- 0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
+ 0, 1, query_handle_, db_.get(), storage_manager_.get(), &bus_));
}
inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
@@ -291,8 +291,8 @@ class QueryManagerTest : public ::testing::Test {
unique_ptr<CatalogDatabase> db_;
unique_ptr<StorageManager> storage_manager_;
- QueryPlan *query_plan_;
- unique_ptr<QueryHandle> query_handle_;
+ QueryPlan *query_plan_; // Owned by 'query_handle_'.
+ QueryHandle* query_handle_; // Owned by 'query_manager_'.
unique_ptr<QueryManagerSingleNode> query_manager_;
MessageBusImpl bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2351dcd..5100651 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -160,37 +160,39 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
const ParseStatement &parse_statement = *result.parsed_statement;
std::printf("%s\n", parse_statement.toString().c_str());
+
+ const CatalogRelation *query_result_relation = nullptr;
try {
OptimizerContext optimizer_context;
- QueryHandle query_handle(query_id_++, cli_id_);
+ auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
optimizer_.generateQueryHandle(parse_statement,
test_database_loader_->catalog_database(),
&optimizer_context,
- &query_handle);
+ query_handle.get());
+ query_result_relation = query_handle->getQueryResultRelation();
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
cli_id_,
foreman_->getBusClientID(),
- &query_handle,
+ query_handle.release(),
&bus_);
-
- const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
- DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
-
- const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
- if (query_result_relation) {
- PrintToScreen::PrintRelation(*query_result_relation,
- test_database_loader_->storage_manager(),
- output_stream.file());
- DropRelation::Drop(*query_result_relation,
- test_database_loader_->catalog_database(),
- test_database_loader_->storage_manager());
- }
} catch (const SqlError &error) {
*output = error.formatMessage(input);
break;
}
+
+ const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+ DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+ if (query_result_relation) {
+ PrintToScreen::PrintRelation(*query_result_relation,
+ test_database_loader_->storage_manager(),
+ output_stream.file());
+ DropRelation::Drop(*query_result_relation,
+ test_database_loader_->catalog_database(),
+ test_database_loader_->storage_manager());
+ }
}
if (output->empty()) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 06397d4..ee9bee7 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -20,6 +20,7 @@
#include "query_optimizer/tests/ExecutionGeneratorTestRunner.hpp"
#include <cstdio>
+#include <memory>
#include <set>
#include <string>
@@ -71,37 +72,39 @@ void ExecutionGeneratorTestRunner::runTestCase(
} else {
const ParseStatement &parse_statement = *result.parsed_statement;
std::printf("%s\n", parse_statement.toString().c_str());
+
+ const CatalogRelation *query_result_relation = nullptr;
try {
- QueryHandle query_handle(0 /* query_id */, main_thread_client_id_);
OptimizerContext optimizer_context;
+ auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_);
optimizer_.generateQueryHandle(parse_statement,
test_database_loader_.catalog_database(),
&optimizer_context,
- &query_handle);
+ query_handle.get());
+ query_result_relation = query_handle->getQueryResultRelation();
QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id_,
foreman_->getBusClientID(),
- &query_handle,
+ query_handle.release(),
&bus_);
-
- QueryExecutionUtil::ReceiveQueryCompletionMessage(
- main_thread_client_id_, &bus_);
-
- const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
- if (query_result_relation) {
- PrintToScreen::PrintRelation(*query_result_relation,
- test_database_loader_.storage_manager(),
- output_stream.file());
- DropRelation::Drop(*query_result_relation,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager());
- }
} catch (const SqlError &error) {
*output = error.formatMessage(input);
break;
}
+
+ QueryExecutionUtil::ReceiveQueryCompletionMessage(
+ main_thread_client_id_, &bus_);
+
+ if (query_result_relation) {
+ PrintToScreen::PrintRelation(*query_result_relation,
+ test_database_loader_.storage_manager(),
+ output_stream.file());
+ DropRelation::Drop(*query_result_relation,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager());
+ }
}
}