You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/06 19:33:36 UTC
[1/2] incubator-quickstep git commit: Query ID added in CreateTable,
Index, Delete and DestroyHash
Repository: incubator-quickstep
Updated Branches:
refs/heads/query-id-operator-workorder 06d35016a -> 1290dd703
Query ID added in CreateTable, Index, Delete and DestroyHash
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/58d5ccbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/58d5ccbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/58d5ccbb
Branch: refs/heads/query-id-operator-workorder
Commit: 58d5ccbb46a4e456c2361397f09a7fc33c49bf99
Parents: 06d3501
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jun 5 23:09:51 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sun Jun 5 23:09:51 2016 -0500
----------------------------------------------------------------------
query_optimizer/ExecutionGenerator.cpp | 11 +++++++----
relational_operators/CreateIndexOperator.hpp | 8 ++++++--
relational_operators/CreateTableOperator.hpp | 10 +++++++---
relational_operators/DeleteOperator.cpp | 2 ++
relational_operators/DeleteOperator.hpp | 12 +++++++++---
relational_operators/DestroyHashOperator.cpp | 5 +++--
relational_operators/DestroyHashOperator.hpp | 9 +++++++--
relational_operators/WorkOrder.hpp | 2 +-
relational_operators/WorkOrderFactory.cpp | 5 ++++-
.../tests/HashJoinOperator_unittest.cpp | 12 ++++++------
10 files changed, 52 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index f5e47bd..60389cb 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -794,8 +794,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
insert_destination_proto->set_relational_op_index(join_operator_index);
const QueryPlan::DAGNodeIndex destroy_operator_index =
- execution_plan_->addRelationalOperator(
- new DestroyHashOperator(join_hash_table_index));
+ execution_plan_->addRelationalOperator(new DestroyHashOperator(
+ join_hash_table_index, query_handle_->query_id()));
if (!build_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(build_operator_index,
@@ -993,6 +993,7 @@ void ExecutionGenerator::convertCreateIndex(
}
execution_plan_->addRelationalOperator(new CreateIndexOperator(input_relation,
physical_plan->index_name(),
+ query_handle_->query_id(),
std::move(index_description)));
}
@@ -1033,7 +1034,8 @@ void ExecutionGenerator::convertCreateTable(
}
execution_plan_->addRelationalOperator(
- new CreateTableOperator(catalog_relation.release(),
+ new CreateTableOperator(query_handle_->query_id(),
+ catalog_relation.release(),
optimizer_context_->catalog_database()));
}
@@ -1075,7 +1077,8 @@ void ExecutionGenerator::convertDeleteTuples(
execution_plan_->addRelationalOperator(new DeleteOperator(
*input_relation_info->relation,
execution_predicate_index,
- input_relation_info->isStoredRelation()));
+ input_relation_info->isStoredRelation(),
+ query_handle_->query_id()));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(delete_tuples_index,
input_relation_info->producer_operator_index,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/CreateIndexOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp
index 2bfacc4..cb614ad 100644
--- a/relational_operators/CreateIndexOperator.hpp
+++ b/relational_operators/CreateIndexOperator.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_CREATE_INDEX_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_CREATE_INDEX_OPERATOR_HPP_
+#include <cstddef>
#include <string>
#include "catalog/CatalogRelation.hpp"
@@ -52,19 +53,22 @@ class CreateIndexOperator : public RelationalOperator {
*
* @param relation The relation to create index upon.
* @param index_name The index to create.
+ * @param The ID of the query to which this operator belongs.
* @param index_description The index_description associated with this index.
**/
CreateIndexOperator(CatalogRelation *relation,
const std::string &index_name,
+ const std::size_t query_id,
IndexSubBlockDescription &&index_description) // NOLINT(whitespace/operators)
- : relation_(DCHECK_NOTNULL(relation)),
+ : RelationalOperator(query_id),
+ relation_(DCHECK_NOTNULL(relation)),
index_name_(index_name),
index_description_(index_description) {}
~CreateIndexOperator() override {}
/**
- * @note no WorkOrder generated for this operator.
+ * @note No WorkOrder generated for this operator.
**/
bool getAllWorkOrders(WorkOrdersContainer *container,
QueryContext *query_context,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/CreateTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateTableOperator.hpp b/relational_operators/CreateTableOperator.hpp
index 98f3253..1439a2b 100644
--- a/relational_operators/CreateTableOperator.hpp
+++ b/relational_operators/CreateTableOperator.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_CREATE_TABLE_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_CREATE_TABLE_OPERATOR_HPP_
+#include <cstddef>
#include <memory>
#include "catalog/CatalogRelation.hpp"
@@ -49,20 +50,23 @@ class CreateTableOperator : public RelationalOperator {
/**
* @brief Constructor.
*
+ * @param The ID of the query to which this operator belongs.
* @param relation The relation to add. This CreateTableOperator owns
* relation until the WorkOrder it produces is successfully executed,
* at which point it is owned by database.
* @param database The database to add a relation to.
**/
- CreateTableOperator(CatalogRelation *relation,
+ CreateTableOperator(const std::size_t query_id,
+ CatalogRelation *relation,
CatalogDatabase *database)
- : relation_(DCHECK_NOTNULL(relation)),
+ : RelationalOperator(query_id),
+ relation_(DCHECK_NOTNULL(relation)),
database_(DCHECK_NOTNULL(database)) {}
~CreateTableOperator() override {}
/**
- * @note no WorkOrder generated for this operator.
+ * @note No WorkOrder generated for this operator.
**/
bool getAllWorkOrders(WorkOrdersContainer *container,
QueryContext *query_context,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 2c2c6de..94169ed 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -60,6 +60,7 @@ bool DeleteOperator::getAllWorkOrders(
storage_manager,
op_index_,
scheduler_client_id,
+ query_id_,
bus),
op_index_);
}
@@ -75,6 +76,7 @@ bool DeleteOperator::getAllWorkOrders(
storage_manager,
op_index_,
scheduler_client_id,
+ query_id_,
bus),
op_index_);
++num_workorders_generated_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index 1d44552..9b6a44d 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -61,11 +61,14 @@ class DeleteOperator : public RelationalOperator {
* tuples will be deleted).
* @param relation_is_stored If relation is a stored relation and is fully
* available to the operator before it can start generating workorders.
+ * @param The ID of the query to which this operator belongs.
**/
DeleteOperator(const CatalogRelation &relation,
const QueryContext::predicate_id predicate_index,
- const bool relation_is_stored)
- : relation_(relation),
+ const bool relation_is_stored,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ relation_(relation),
predicate_index_(predicate_index),
relation_is_stored_(relation_is_stored),
started_(false),
@@ -127,6 +130,7 @@ class DeleteWorkOrder : public WorkOrder {
* @param delete_operator_index The index of the Delete Operator in the query
* plan DAG.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
+ * @param The ID of the query to which this operator belongs.
* @param bus A pointer to the TMB.
**/
DeleteWorkOrder(const CatalogRelationSchema &input_relation,
@@ -135,8 +139,10 @@ class DeleteWorkOrder : public WorkOrder {
StorageManager *storage_manager,
const std::size_t delete_operator_index,
const tmb::client_id scheduler_client_id,
+ const std::size_t query_id,
MessageBus *bus)
- : input_relation_(input_relation),
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
input_block_id_(input_block_id),
predicate_(predicate),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/DestroyHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp
index c2220d5..c92117a 100644
--- a/relational_operators/DestroyHashOperator.cpp
+++ b/relational_operators/DestroyHashOperator.cpp
@@ -32,8 +32,9 @@ bool DestroyHashOperator::getAllWorkOrders(
tmb::MessageBus *bus) {
if (blocking_dependencies_met_ && !work_generated_) {
work_generated_ = true;
- container->addNormalWorkOrder(new DestroyHashWorkOrder(hash_table_index_, query_context),
- op_index_);
+ container->addNormalWorkOrder(
+ new DestroyHashWorkOrder(hash_table_index_, query_id_, query_context),
+ op_index_);
}
return work_generated_;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/DestroyHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp
index 46331ba..d799e05 100644
--- a/relational_operators/DestroyHashOperator.hpp
+++ b/relational_operators/DestroyHashOperator.hpp
@@ -47,9 +47,12 @@ class DestroyHashOperator : public RelationalOperator {
* @brief Constructor.
*
* @param hash_table_index The index of the JoinHashTable in QueryContext.
+ * @param The ID of the query to which this operator belongs.
**/
- explicit DestroyHashOperator(const QueryContext::join_hash_table_id hash_table_index)
- : hash_table_index_(hash_table_index),
+ DestroyHashOperator(const QueryContext::join_hash_table_id hash_table_index,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ hash_table_index_(hash_table_index),
work_generated_(false) {}
~DestroyHashOperator() override {}
@@ -76,9 +79,11 @@ class DestroyHashWorkOrder : public WorkOrder {
* @brief Constructor.
*
* @param hash_table_index The index of the JoinHashTable in QueryContext.
+ * @param The ID of the query to which this operator belongs.
* @param query_context The QueryContext to use.
**/
DestroyHashWorkOrder(const QueryContext::join_hash_table_id hash_table_index,
+ const std::size_t query_id,
QueryContext *query_context)
: hash_table_index_(hash_table_index),
query_context_(DCHECK_NOTNULL(query_context)) {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index b179d8e..5f8ad3f 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -286,7 +286,7 @@ class WorkOrder {
}
protected:
- WorkOrder(const std::size_t query_id = 0)
+ explicit WorkOrder(const std::size_t query_id = 0)
: query_id_(query_id) {}
const std::size_t query_id_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index b91fe2d..9828ab9 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -110,12 +110,15 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
storage_manager,
proto.GetExtension(serialization::DeleteWorkOrder::operator_index),
shiftboss_client_id,
+ proto.query_id(),
bus);
}
case serialization::DESTROY_HASH: {
LOG(INFO) << "Creating DestroyHashWorkOrder";
return new DestroyHashWorkOrder(
- proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index),
+ proto.GetExtension(
+ serialization::DestroyHashWorkOrder::join_hash_table_index),
+ proto.query_id(),
query_context);
}
case serialization::DROP_TABLE: {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/58d5ccbb/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index dfec228..4ef5a5c 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -423,7 +423,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
}
// Create cleaner operator.
- unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+ unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0 /* dummy query ID */));
cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
@@ -592,7 +592,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
}
// Create cleaner operator.
- unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+ unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0 /* dummy query ID */));
cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
@@ -727,7 +727,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
}
// Create cleaner operator.
- unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+ unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0 /* dummy query ID */));
cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
@@ -893,7 +893,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
}
// Create the cleaner operator.
- unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+ unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0 /* dummy query ID */));
cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
@@ -1068,7 +1068,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
}
// Create cleaner operator.
- unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+ unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0 /* dummy query ID */));
cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
@@ -1254,7 +1254,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
}
// Create cleaner operator.
- unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+ unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0 /* dummy query ID */));
cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
[2/2] incubator-quickstep git commit: Added query ID to all the
remaining operators and work orders.
Posted by hb...@apache.org.
Added query ID to all the remaining operators and work orders.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1290dd70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1290dd70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1290dd70
Branch: refs/heads/query-id-operator-workorder
Commit: 1290dd70302d1896bbc6e9100175fceb4d7137f5
Parents: 58d5ccb
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Mon Jun 6 14:32:08 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Mon Jun 6 14:32:08 2016 -0500
----------------------------------------------------------------------
query_execution/Foreman.cpp | 3 ++
query_execution/QueryManager.cpp | 1 +
query_optimizer/ExecutionGenerator.cpp | 50 +++++++++++++-------
relational_operators/DropTableOperator.cpp | 3 +-
relational_operators/DropTableOperator.hpp | 13 +++--
.../FinalizeAggregationOperator.cpp | 6 ++-
.../FinalizeAggregationOperator.hpp | 15 ++++--
relational_operators/InsertOperator.cpp | 6 ++-
relational_operators/InsertOperator.hpp | 15 ++++--
.../NestedLoopsJoinOperator.cpp | 4 ++
.../NestedLoopsJoinOperator.hpp | 12 +++--
relational_operators/RebuildWorkOrder.hpp | 2 +
relational_operators/SampleOperator.cpp | 7 ++-
relational_operators/SampleOperator.hpp | 13 +++--
relational_operators/SaveBlocksOperator.cpp | 1 +
relational_operators/SaveBlocksOperator.hpp | 9 +++-
relational_operators/SelectOperator.cpp | 4 ++
relational_operators/SelectOperator.hpp | 24 +++++++---
relational_operators/SortMergeRunOperator.cpp | 1 +
relational_operators/SortMergeRunOperator.hpp | 12 +++--
.../SortRunGenerationOperator.cpp | 2 +
.../SortRunGenerationOperator.hpp | 12 +++--
relational_operators/TableGeneratorOperator.cpp | 7 ++-
relational_operators/TableGeneratorOperator.hpp | 13 +++--
relational_operators/TextScanOperator.cpp | 11 ++++-
relational_operators/TextScanOperator.hpp | 16 +++++--
relational_operators/UpdateOperator.cpp | 1 +
relational_operators/UpdateOperator.hpp | 12 +++--
relational_operators/WorkOrderFactory.cpp | 21 ++++++--
.../tests/AggregationOperator_unittest.cpp | 12 +++--
.../tests/SortMergeRunOperator_unittest.cpp | 6 ++-
.../SortRunGenerationOperator_unittest.cpp | 3 +-
.../tests/TextScanOperator_unittest.cpp | 3 +-
33 files changed, 240 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 304c429..b358f70 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -518,11 +518,14 @@ void Foreman::getRebuildWorkOrders(const dag_node_index index, WorkOrdersContain
for (vector<MutableBlockReference>::size_type i = 0;
i < partially_filled_block_refs.size();
++i) {
+ // Note: The query ID used below is dummy for now, it will be replaced with
+ // the true query ID when QueryManager gets used in Foreman.
container->addRebuildWorkOrder(
new RebuildWorkOrder(move(partially_filled_block_refs[i]),
index,
op.getOutputRelationID(),
foreman_client_id_,
+ 0,
bus_),
index);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
index 02c5d4c..21f5820 100644
--- a/query_execution/QueryManager.cpp
+++ b/query_execution/QueryManager.cpp
@@ -461,6 +461,7 @@ void QueryManager::getRebuildWorkOrders(const dag_node_index index,
index,
op.getOutputRelationID(),
foreman_client_id_,
+ query_id_,
bus_),
index);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 60389cb..30dfa8e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -194,6 +194,7 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
const QueryPlan::DAGNodeIndex drop_table_index =
execution_plan_->addRelationalOperator(
new DropTableOperator(*temporary_relation,
+ query_handle_->query_id(),
optimizer_context_->catalog_database(),
false /* only_drop_blocks */));
DCHECK(!temporary_relation_info.isStoredRelation());
@@ -415,7 +416,8 @@ void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
insert_destination_index,
input_relation_info->isStoredRelation(),
physical_sample->is_block_sample(),
- physical_sample->percentage());
+ physical_sample->percentage(),
+ query_handle_->query_id());
const QueryPlan::DAGNodeIndex sample_index =
execution_plan_->addRelationalOperator(sample_op);
insert_destination_proto->set_relational_op_index(sample_index);
@@ -531,13 +533,15 @@ void ExecutionGenerator::convertSelection(
insert_destination_index,
execution_predicate_index,
move(attributes),
- input_relation_info->isStoredRelation())
+ input_relation_info->isStoredRelation(),
+ query_handle_->query_id())
: new SelectOperator(*input_relation_info->relation,
*output_relation,
insert_destination_index,
execution_predicate_index,
project_expressions_group_index,
- input_relation_info->isStoredRelation());
+ input_relation_info->isStoredRelation(),
+ query_handle_->query_id());
const QueryPlan::DAGNodeIndex select_index =
execution_plan_->addRelationalOperator(op);
@@ -889,7 +893,8 @@ void ExecutionGenerator::convertNestedLoopsJoin(
execution_join_predicate_index,
project_expressions_group_index,
left_relation_info->isStoredRelation(),
- right_relation_info->isStoredRelation()));
+ right_relation_info->isStoredRelation(),
+ query_handle_->query_id()));
insert_destination_proto->set_relational_op_index(join_operator_index);
if (!left_relation_info->isStoredRelation()) {
@@ -940,12 +945,13 @@ void ExecutionGenerator::convertCopyFrom(
physical_plan->escape_strings(),
FLAGS_parallelize_load,
*output_relation,
- insert_destination_index));
+ insert_destination_index,
+ query_handle_->query_id()));
insert_destination_proto->set_relational_op_index(scan_operator_index);
const QueryPlan::DAGNodeIndex save_blocks_operator_index =
execution_plan_->addRelationalOperator(
- new SaveBlocksOperator());
+ new SaveBlocksOperator(query_handle_->query_id()));
execution_plan_->addDirectDependency(save_blocks_operator_index,
scan_operator_index,
false /* is_pipeline_breaker */);
@@ -1062,6 +1068,7 @@ void ExecutionGenerator::convertDeleteTuples(
const QueryPlan::DAGNodeIndex drop_table_index =
execution_plan_->addRelationalOperator(
new DropTableOperator(*input_relation_info->relation,
+ query_handle_->query_id(),
optimizer_context_->catalog_database(),
true /* only_drop_blocks */));
if (!input_relation_info->isStoredRelation()) {
@@ -1087,7 +1094,7 @@ void ExecutionGenerator::convertDeleteTuples(
const QueryPlan::DAGNodeIndex save_blocks_index =
execution_plan_->addRelationalOperator(
- new SaveBlocksOperator());
+ new SaveBlocksOperator(query_handle_->query_id()));
execution_plan_->addDirectDependency(save_blocks_index,
delete_tuples_index,
false /* is_pipeline_breaker */);
@@ -1105,6 +1112,7 @@ void ExecutionGenerator::convertDropTable(
execution_plan_->addRelationalOperator(
new DropTableOperator(catalog_relation,
+ query_handle_->query_id(),
optimizer_context_->catalog_database()));
}
@@ -1158,12 +1166,13 @@ void ExecutionGenerator::convertInsertTuple(
execution_plan_->addRelationalOperator(
new InsertOperator(input_relation,
insert_destination_index,
- tuple_index));
+ tuple_index,
+ query_handle_->query_id()));
insert_destination_proto->set_relational_op_index(insert_operator_index);
const QueryPlan::DAGNodeIndex save_blocks_index =
execution_plan_->addRelationalOperator(
- new SaveBlocksOperator());
+ new SaveBlocksOperator(query_handle_->query_id()));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(insert_operator_index,
input_relation_info->producer_operator_index,
@@ -1236,14 +1245,15 @@ void ExecutionGenerator::convertInsertSelection(
insert_destination_index,
QueryContext::kInvalidPredicateId,
move(attributes),
- selection_relation_info->isStoredRelation());
+ selection_relation_info->isStoredRelation(),
+ query_handle_->query_id());
const QueryPlan::DAGNodeIndex insert_selection_index =
execution_plan_->addRelationalOperator(insert_selection_op);
insert_destination_proto->set_relational_op_index(insert_selection_index);
const QueryPlan::DAGNodeIndex save_blocks_index =
- execution_plan_->addRelationalOperator(new SaveBlocksOperator());
+ execution_plan_->addRelationalOperator(new SaveBlocksOperator(query_handle_->query_id()));
if (!selection_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(insert_selection_index,
@@ -1316,12 +1326,13 @@ void ExecutionGenerator::convertUpdateTable(
*optimizer_context_->catalog_database()->getRelationById(input_rel_id),
relocation_destination_index,
execution_predicate_index,
- update_group_index));
+ update_group_index,
+ query_handle_->query_id()));
relocation_destination_proto->set_relational_op_index(update_operator_index);
const QueryPlan::DAGNodeIndex save_blocks_index =
execution_plan_->addRelationalOperator(
- new SaveBlocksOperator());
+ new SaveBlocksOperator(query_handle_->query_id()));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(update_operator_index,
input_relation_info->producer_operator_index,
@@ -1441,7 +1452,8 @@ void ExecutionGenerator::convertAggregate(
execution_plan_->addRelationalOperator(
new FinalizeAggregationOperator(aggr_state_index,
*output_relation,
- insert_destination_index));
+ insert_destination_index,
+ query_handle_->query_id()));
insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
@@ -1492,7 +1504,8 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
*initial_runs_relation,
initial_runs_destination_id,
sort_run_gen_config_id,
- input_relation_info->isStoredRelation()));
+ input_relation_info->isStoredRelation(),
+ query_handle_->query_id()));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(run_generator_index,
input_relation_info->producer_operator_index,
@@ -1549,7 +1562,8 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
sort_merge_run_config_id,
64 /* merge_factor */,
physical_sort->limit(),
- false /* input_relation_is_stored */));
+ false /* input_relation_is_stored */,
+ query_handle_->query_id()));
execution_plan_->addDirectDependency(merge_run_operator_index,
run_generator_index,
false /* is_pipeline_breaker */);
@@ -1563,6 +1577,7 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
execution_plan_->addRelationalOperator(
new DropTableOperator(
*merged_runs_relation,
+ query_handle_->query_id(),
optimizer_context_->catalog_database(),
false /* only_drop_blocks */));
execution_plan_->addDirectDependency(
@@ -1600,7 +1615,8 @@ void ExecutionGenerator::convertTableGenerator(
TableGeneratorOperator *op =
new TableGeneratorOperator(*output_relation,
insert_destination_index,
- generator_function_index);
+ generator_function_index,
+ query_handle_->query_id());
const QueryPlan::DAGNodeIndex tablegen_index =
execution_plan_->addRelationalOperator(op);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/DropTableOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.cpp b/relational_operators/DropTableOperator.cpp
index f3a3a2c..256f6a1 100644
--- a/relational_operators/DropTableOperator.cpp
+++ b/relational_operators/DropTableOperator.cpp
@@ -45,7 +45,8 @@ bool DropTableOperator::getAllWorkOrders(
// DropTableWorkOrder only drops blocks, if any.
container->addNormalWorkOrder(
- new DropTableWorkOrder(std::move(relation_blocks), storage_manager),
+ new DropTableWorkOrder(
+ query_id_, std::move(relation_blocks), storage_manager),
op_index_);
database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/DropTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp
index bf9b1b1..e5fc989 100644
--- a/relational_operators/DropTableOperator.hpp
+++ b/relational_operators/DropTableOperator.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_DROP_TABLE_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_DROP_TABLE_OPERATOR_HPP_
+#include <cstddef>
#include <utility>
#include <vector>
@@ -55,14 +56,17 @@ class DropTableOperator : public RelationalOperator {
* @brief Constructor.
*
* @param relation The relation to drop.
+ * @param The ID of the query to which this operator belongs.
* @param database The databse where to drop \c relation.
* @param only_drop_blocks If true, only drop the blocks belonging to \c
* relation, but leave \c relation in \c database.
**/
DropTableOperator(const CatalogRelation &relation,
+ const std::size_t query_id,
CatalogDatabase *database,
const bool only_drop_blocks = false)
- : relation_(relation),
+ : RelationalOperator(query_id),
+ relation_(relation),
database_(database),
only_drop_blocks_(only_drop_blocks),
work_generated_(false) {}
@@ -95,17 +99,20 @@ class DropTableWorkOrder : public WorkOrder {
/**
* @brief Constructor.
*
+ * @param The ID of the query to which this operator belongs.
* @param blocks The blocks to drop.
* @param storage_manager The StorageManager to use.
* @param rel_id The relation id to drop.
* @param catalog_database_cache The CatalogDatabaseCache in the distributed
* version.
**/
- DropTableWorkOrder(std::vector<block_id> &&blocks,
+ DropTableWorkOrder(const std::size_t query_id,
+ std::vector<block_id> &&blocks,
StorageManager *storage_manager,
const relation_id rel_id = kInvalidCatalogId,
CatalogDatabaseLite *catalog_database_cache = nullptr)
- : blocks_(std::move(blocks)),
+ : WorkOrder(query_id),
+ blocks_(std::move(blocks)),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
rel_id_(rel_id),
catalog_database_cache_(catalog_database_cache) {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 410ec69..1dc4188 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -38,8 +38,10 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
if (blocking_dependencies_met_ && !started_) {
started_ = true;
container->addNormalWorkOrder(
- new FinalizeAggregationWorkOrder(query_context->releaseAggregationState(aggr_state_index_),
- query_context->getInsertDestination(output_destination_index_)),
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ query_context->releaseAggregationState(aggr_state_index_),
+ query_context->getInsertDestination(output_destination_index_)),
op_index_);
}
return started_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index fb9608a..da93e34 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_FINALIZE_AGGREGATION_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_FINALIZE_AGGREGATION_OPERATOR_HPP_
+#include <cstddef>
#include <memory>
#include "catalog/CatalogRelation.hpp"
@@ -57,11 +58,14 @@ class FinalizeAggregationOperator : public RelationalOperator {
* @param output_relation The output relation.
* @param output_destination_index The index of the InsertDestination in the
* QueryContext to insert aggregation results.
+ * @param The ID of the query to which this operator belongs.
*/
FinalizeAggregationOperator(const QueryContext::aggregation_state_id aggr_state_index,
const CatalogRelation &output_relation,
- const QueryContext::insert_destination_id output_destination_index)
- : aggr_state_index_(aggr_state_index),
+ const QueryContext::insert_destination_id output_destination_index,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ aggr_state_index_(aggr_state_index),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
started_(false) {}
@@ -101,13 +105,16 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
*
* @note InsertWorkOrder takes ownership of \c state.
*
+ * @param The ID of the query to which this operator belongs.
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
*/
- FinalizeAggregationWorkOrder(AggregationOperationState *state,
+ FinalizeAggregationWorkOrder(const std::size_t query_id,
+ AggregationOperationState *state,
InsertDestination *output_destination)
- : state_(DCHECK_NOTNULL(state)),
+ : WorkOrder(query_id),
+ state_(DCHECK_NOTNULL(state)),
output_destination_(DCHECK_NOTNULL(output_destination)) {}
~FinalizeAggregationWorkOrder() override {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/InsertOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp
index 8d083e5..3ec9933 100644
--- a/relational_operators/InsertOperator.cpp
+++ b/relational_operators/InsertOperator.cpp
@@ -40,8 +40,10 @@ bool InsertOperator::getAllWorkOrders(
work_generated_ = true;
container->addNormalWorkOrder(
- new InsertWorkOrder(query_context->getInsertDestination(output_destination_index_),
- query_context->releaseTuple(tuple_index_)),
+ new InsertWorkOrder(
+ query_id_,
+ query_context->getInsertDestination(output_destination_index_),
+ query_context->releaseTuple(tuple_index_)),
op_index_);
}
return work_generated_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/InsertOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InsertOperator.hpp b/relational_operators/InsertOperator.hpp
index 8a06c94..5ac0051 100644
--- a/relational_operators/InsertOperator.hpp
+++ b/relational_operators/InsertOperator.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INSERT_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_INSERT_OPERATOR_HPP_
+#include <cstddef>
#include <memory>
#include "catalog/CatalogRelation.hpp"
@@ -56,11 +57,14 @@ class InsertOperator : public RelationalOperator {
* @param output_destination_index The index of the InsertDestination in the
* QueryContext to insert the tuple.
* @param tuple_index The index of the tuple to insert in the QueryContext.
+ * @param The ID of the query to which this operator belongs.
**/
InsertOperator(const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index,
- const QueryContext::tuple_id tuple_index)
- : output_relation_(output_relation),
+ const QueryContext::tuple_id tuple_index,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ output_relation_(output_relation),
output_destination_index_(output_destination_index),
tuple_index_(tuple_index),
work_generated_(false) {}
@@ -100,12 +104,15 @@ class InsertWorkOrder : public WorkOrder {
*
* @note InsertWorkOrder takes ownership of \c tuple.
*
+ * @param The ID of the query to which this operator belongs.
* @param output_destination The InsertDestination to insert the tuple.
* @param tuple The tuple to insert.
**/
- InsertWorkOrder(InsertDestination *output_destination,
+ InsertWorkOrder(const std::size_t query_id,
+ InsertDestination *output_destination,
Tuple *tuple)
- : output_destination_(DCHECK_NOTNULL(output_destination)),
+ : WorkOrder(query_id),
+ output_destination_(DCHECK_NOTNULL(output_destination)),
tuple_(DCHECK_NOTNULL(tuple)) {}
~InsertWorkOrder() override {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/NestedLoopsJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp
index 5cc498b..317cc5d 100644
--- a/relational_operators/NestedLoopsJoinOperator.cpp
+++ b/relational_operators/NestedLoopsJoinOperator.cpp
@@ -82,6 +82,7 @@ bool NestedLoopsJoinOperator::getAllWorkOrders(
right_block_id,
query_context->getPredicate(join_predicate_index_),
query_context->getScalarGroup(selection_index_),
+ query_id_,
query_context->getInsertDestination(output_destination_index_),
storage_manager),
op_index_);
@@ -171,6 +172,7 @@ std::size_t NestedLoopsJoinOperator::getAllWorkOrdersHelperBothNotStored(WorkOrd
right_relation_block_ids_[right_index],
query_context->getPredicate(join_predicate_index_),
query_context->getScalarGroup(selection_index_),
+ query_id_,
query_context->getInsertDestination(output_destination_index_),
storage_manager),
op_index_);
@@ -205,6 +207,7 @@ bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContaine
right_relation_block_ids_[right_index],
join_predicate,
selection,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -224,6 +227,7 @@ bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContaine
right_block_id,
join_predicate,
selection,
+ query_id_,
output_destination,
storage_manager),
op_index_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/NestedLoopsJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index a52ca25..9d81d10 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -76,6 +76,7 @@ class NestedLoopsJoinOperator : public RelationalOperator {
* @param left_relation_is_stored If left_input_relation is a stored relation.
* @param right_relation_is_stored If right_input_relation is a stored
* relation.
+ * @param The ID of the query to which this operator belongs.
**/
NestedLoopsJoinOperator(const CatalogRelation &left_input_relation,
const CatalogRelation &right_input_relation,
@@ -84,8 +85,10 @@ class NestedLoopsJoinOperator : public RelationalOperator {
const QueryContext::predicate_id join_predicate_index,
const QueryContext::scalar_group_id selection_index,
bool left_relation_is_stored,
- bool right_relation_is_stored)
- : left_input_relation_(left_input_relation),
+ bool right_relation_is_stored,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ left_input_relation_(left_input_relation),
right_input_relation_(right_input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
@@ -230,6 +233,7 @@ class NestedLoopsJoinWorkOrder : public WorkOrder {
* @param selection A list of Scalars corresponding to the relation attributes
* in \c output_destination. Each Scalar is evaluated for the joined
* tuples, and the resulting value is inserted into the join result.
+ * @param The ID of the query to which this operator belongs.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
**/
@@ -239,9 +243,11 @@ class NestedLoopsJoinWorkOrder : public WorkOrder {
const block_id right_block_id,
const Predicate *join_predicate,
const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager)
- : left_input_relation_(left_input_relation),
+ : WorkOrder(query_id),
+ left_input_relation_(left_input_relation),
right_input_relation_(right_input_relation),
left_block_id_(left_block_id),
right_block_id_(right_block_id),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 5443d48..88e5248 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -55,12 +55,14 @@ class RebuildWorkOrder : public WorkOrder {
* @param input_relation_id The ID of the CatalogRelation to which the given
* storage block belongs to.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
+ * @param The ID of the query to which this operator belongs.
* @param bus A pointer to the TMB.
**/
RebuildWorkOrder(MutableBlockReference &&block_ref,
const std::size_t input_operator_index,
const relation_id input_relation_id,
const client_id scheduler_client_id,
+ const std::size_t query_id,
MessageBus *bus)
: block_ref_(std::move(block_ref)),
input_operator_index_(input_operator_index),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SampleOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SampleOperator.cpp b/relational_operators/SampleOperator.cpp
index 6842b28..b318ce4 100644
--- a/relational_operators/SampleOperator.cpp
+++ b/relational_operators/SampleOperator.cpp
@@ -58,6 +58,7 @@ bool SampleOperator::getAllWorkOrders(
input_block_id,
is_block_sample_,
percentage_,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -72,6 +73,7 @@ bool SampleOperator::getAllWorkOrders(
input_block_id,
is_block_sample_,
percentage_,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -89,6 +91,7 @@ bool SampleOperator::getAllWorkOrders(
input_relation_block_ids_[num_workorders_generated_],
is_block_sample_,
percentage_,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -101,7 +104,9 @@ bool SampleOperator::getAllWorkOrders(
new SampleWorkOrder(input_relation_,
input_relation_block_ids_[num_workorders_generated_],
is_block_sample_,
- percentage_, output_destination,
+ percentage_,
+ query_id_,
+ output_destination,
storage_manager),
op_index_);
++num_workorders_generated_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SampleOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SampleOperator.hpp b/relational_operators/SampleOperator.hpp
index 305de34..7f88eea 100644
--- a/relational_operators/SampleOperator.hpp
+++ b/relational_operators/SampleOperator.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_SAMPLE_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_SAMPLE_OPERATOR_HPP_
+#include <cstddef>
#include <memory>
#include <vector>
@@ -64,15 +65,17 @@ class SampleOperator : public RelationalOperator {
* workorders.
* @param is_block_sample Flag indicating whether the sample type is block or tuple.
* @param percentage The percentage of data to be sampled.
- *
+ * @param The ID of the query to which this operator belongs.
**/
SampleOperator(const CatalogRelation &input_relation,
const CatalogRelationSchema &output_relation,
const QueryContext::insert_destination_id output_destination_index,
const bool input_relation_is_stored,
const bool is_block_sample,
- const int percentage)
- : input_relation_(input_relation),
+ const int percentage,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
input_relation_is_stored_(input_relation_is_stored),
@@ -134,9 +137,11 @@ class SampleWorkOrder : public WorkOrder {
const block_id input_block_id,
const bool is_block_sample,
const int percentage,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager)
- : input_relation_(input_relation),
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
input_block_id_(input_block_id),
is_block_sample_(is_block_sample),
percentage_(percentage),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SaveBlocksOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SaveBlocksOperator.cpp b/relational_operators/SaveBlocksOperator.cpp
index ac61407..3581090 100644
--- a/relational_operators/SaveBlocksOperator.cpp
+++ b/relational_operators/SaveBlocksOperator.cpp
@@ -38,6 +38,7 @@ bool SaveBlocksOperator::getAllWorkOrders(
new SaveBlocksWorkOrder(
destination_block_ids_[num_workorders_generated_],
force_,
+ query_id_,
storage_manager),
op_index_);
++num_workorders_generated_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SaveBlocksOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp
index 49195ea..3bcc6a7 100644
--- a/relational_operators/SaveBlocksOperator.hpp
+++ b/relational_operators/SaveBlocksOperator.hpp
@@ -18,6 +18,7 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_SAVE_BLOCKS_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_SAVE_BLOCKS_OPERATOR_HPP_
+#include <cstddef>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -50,11 +51,13 @@ class SaveBlocksOperator : public RelationalOperator {
/**
* @brief Constructor for saving only modified blocks in a relation.
*
+ * @param query_id The ID of the query to which this operator belongs.
* @param force If true, force writing of all blocks to disk, otherwise only
* write dirty blocks.
**/
- explicit SaveBlocksOperator(bool force = false)
- : force_(force),
+ explicit SaveBlocksOperator(const std::size_t query_id, bool force = false)
+ : RelationalOperator(query_id),
+ force_(force),
num_workorders_generated_(0) {}
~SaveBlocksOperator() override {}
@@ -96,10 +99,12 @@ class SaveBlocksWorkOrder : public WorkOrder {
* @param save_block_id The id of the block to save.
* @param force If true, force writing of all blocks to disk, otherwise only
* write dirty blocks.
+ * @param query_id The ID of the query to which this operator belongs.
* @param storage_manager The StorageManager to use.
**/
SaveBlocksWorkOrder(const block_id save_block_id,
const bool force,
+ const std::size_t query_id,
StorageManager *storage_manager)
: save_block_id_(save_block_id),
force_(force),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index 69bb434..350890d 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -49,6 +49,7 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
simple_projection_,
simple_selection_,
selection,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -63,6 +64,7 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
simple_projection_,
simple_selection_,
selection,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -91,6 +93,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
simple_projection_,
simple_selection_,
selection,
+ query_id_,
output_destination,
storage_manager,
placement_scheme_->getNUMANodeForBlock(input_block_id)),
@@ -111,6 +114,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
simple_projection_,
simple_selection_,
selection,
+ query_id_,
output_destination,
storage_manager,
placement_scheme_->getNUMANodeForBlock(block_in_partition)),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 76f4cb6..4f5b8ca 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -76,14 +76,17 @@ class SelectOperator : public RelationalOperator {
* @param input_relation_is_stored If input_relation is a stored relation and
* is fully available to the operator before it can start generating
* workorders.
+ * @param query_id The ID of the query to which this operator belongs.
**/
SelectOperator(const CatalogRelation &input_relation,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::predicate_id predicate_index,
const QueryContext::scalar_group_id selection_index,
- const bool input_relation_is_stored)
- : input_relation_(input_relation),
+ const bool input_relation_is_stored,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
predicate_index_(predicate_index),
@@ -133,14 +136,17 @@ class SelectOperator : public RelationalOperator {
* @param input_relation_is_stored If input_relation is a stored relation and
* is fully available to the operator before it can start generating
* workorders.
+ * @param query_id The ID of the query to which this operator belongs.
**/
SelectOperator(const CatalogRelation &input_relation,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::predicate_id predicate_index,
std::vector<attribute_id> &&selection,
- const bool input_relation_is_stored)
- : input_relation_(input_relation),
+ const bool input_relation_is_stored,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
predicate_index_(predicate_index),
@@ -281,6 +287,7 @@ class SelectWorkOrder : public WorkOrder {
* simple_projection is true.
* @param selection A list of Scalars which will be evaluated to project
* input tuples, used if \c simple_projection is false.
+ * @param query_id The ID of the query to which this WorkOrder belongs.
* @param output_destination The InsertDestination to insert the selection
* results.
* @param storage_manager The StorageManager to use.
@@ -291,10 +298,12 @@ class SelectWorkOrder : public WorkOrder {
const bool simple_projection,
const std::vector<attribute_id> &simple_selection,
const std::vector<std::unique_ptr<const Scalar>> *selection,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager,
const numa_node_id numa_node = 0)
- : input_relation_(input_relation),
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
input_block_id_(input_block_id),
predicate_(predicate),
simple_projection_(simple_projection),
@@ -320,6 +329,7 @@ class SelectWorkOrder : public WorkOrder {
* simple_projection is true.
* @param selection A list of Scalars which will be evaluated to project
* input tuples, used if \c simple_projection is false.
+ * @param query_id The ID of the query to which this WorkOrder belongs.
* @param output_destination The InsertDestination to insert the selection
* results.
* @param storage_manager The StorageManager to use.
@@ -330,10 +340,12 @@ class SelectWorkOrder : public WorkOrder {
const bool simple_projection,
std::vector<attribute_id> &&simple_selection,
const std::vector<std::unique_ptr<const Scalar>> *selection,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager,
const numa_node_id numa_node = 0)
- : input_relation_(input_relation),
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
input_block_id_(input_block_id),
predicate_(predicate),
simple_projection_(simple_projection),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 7427d44..9db8de1 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -90,6 +90,7 @@ WorkOrder *SortMergeRunOperator::createWorkOrder(
std::move(job->runs),
top_k_,
job->level,
+ query_id_,
output_destination,
storage_manager,
op_index_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index f92affe..f54e925 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -88,6 +88,7 @@ class SortMergeRunOperator : public RelationalOperator {
* \c top_k is 0.
* @param input_relation_is_stored Boolean to indicate is input relation is
* stored or streamed.
+ * @param query_id The ID of the query to which this operator belongs.
**/
SortMergeRunOperator(const CatalogRelation &input_relation,
const CatalogRelation &output_relation,
@@ -97,8 +98,10 @@ class SortMergeRunOperator : public RelationalOperator {
const QueryContext::sort_config_id sort_config_index,
const std::size_t merge_factor,
const std::size_t top_k,
- const bool input_relation_is_stored)
- : input_relation_(input_relation),
+ const bool input_relation_is_stored,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
sort_config_index_(sort_config_index),
@@ -216,6 +219,7 @@ class SortMergeRunWorkOrder : public WorkOrder {
* @param input_runs Input runs to merge.
* @param top_k If non-zero will merge only \c top_k tuples.
* @param merge_level Merge level in the merge tree.
+ * @param query_id The ID of the query to which this WorkOrder belongs.
* @param output_destination The InsertDestination to create new blocks.
* @param storage_manager The StorageManager to use.
* @param operator_index Merge-run operator index to send feedback messages
@@ -229,12 +233,14 @@ class SortMergeRunWorkOrder : public WorkOrder {
std::vector<merge_run_operator::Run> &&input_runs,
const std::size_t top_k,
const std::size_t merge_level,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager,
const std::size_t operator_index,
const tmb::client_id scheduler_client_id,
MessageBus *bus)
- : sort_config_(sort_config),
+ : WorkOrder(query_id),
+ sort_config_(sort_config),
run_relation_(run_relation),
input_runs_(std::move(input_runs)),
top_k_(top_k),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SortRunGenerationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.cpp b/relational_operators/SortRunGenerationOperator.cpp
index 9bb3f51..e352f9e 100644
--- a/relational_operators/SortRunGenerationOperator.cpp
+++ b/relational_operators/SortRunGenerationOperator.cpp
@@ -54,6 +54,7 @@ bool SortRunGenerationOperator::getAllWorkOrders(
new SortRunGenerationWorkOrder(input_relation_,
input_block_id,
sort_config,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -69,6 +70,7 @@ bool SortRunGenerationOperator::getAllWorkOrders(
input_relation_,
input_relation_block_ids_[num_workorders_generated_],
sort_config,
+ query_id_,
output_destination,
storage_manager),
op_index_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index 04290a9..7618790 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -83,13 +83,16 @@ class SortRunGenerationOperator : public RelationalOperator {
* @param input_relation_is_stored Does the input relation contain the blocks
* to sort. If \c false, the blocks are
* streamed.
+ * @param query_id The ID of the query to which this operator belongs.
**/
SortRunGenerationOperator(const CatalogRelation &input_relation,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::sort_config_id sort_config_index,
- bool input_relation_is_stored)
- : input_relation_(input_relation),
+ bool input_relation_is_stored,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ input_relation_(input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
sort_config_index_(sort_config_index),
@@ -152,6 +155,7 @@ class SortRunGenerationWorkOrder : public WorkOrder {
* @param input_block_id The block id.
* @param sort_config The Sort configuration specifying ORDER BY, ordering,
* and null ordering.
+ * @param query_id The ID of the query to which this operator belongs.
* @param output_destination The InsertDestination to store the sorted blocks
* of runs.
* @param storage_manager The StorageManager to use.
@@ -159,9 +163,11 @@ class SortRunGenerationWorkOrder : public WorkOrder {
SortRunGenerationWorkOrder(const CatalogRelationSchema &input_relation,
const block_id input_block_id,
const SortConfiguration &sort_config,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager)
- : input_relation_(input_relation),
+ : WorkOrder(query_id),
+ input_relation_(input_relation),
input_block_id_(input_block_id),
sort_config_(sort_config),
output_destination_(DCHECK_NOTNULL(output_destination)),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/TableGeneratorOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp
index 886d05f..fb1f743 100644
--- a/relational_operators/TableGeneratorOperator.cpp
+++ b/relational_operators/TableGeneratorOperator.cpp
@@ -42,8 +42,11 @@ bool TableGeneratorOperator::getAllWorkOrders(
// Currently the generator function is not abstracted to be parallelizable,
// so just produce one work order.
container->addNormalWorkOrder(
- new TableGeneratorWorkOrder(query_context->getGeneratorFunctionHandle(generator_function_index_),
- query_context->getInsertDestination(output_destination_index_)),
+ new TableGeneratorWorkOrder(
+ query_context->getGeneratorFunctionHandle(
+ generator_function_index_),
+ query_id_,
+ query_context->getInsertDestination(output_destination_index_)),
op_index_);
started_ = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index a26b227..27efcad 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -60,12 +60,14 @@ class TableGeneratorOperator : public RelationalOperator {
* QueryContext to insert the generated output.
* @param generator_function_index The index of the GeneratorFunctionHandle in
* the QueryContext.
- *
+ * @param query_id The ID of the query to which this operator belongs.
**/
TableGeneratorOperator(const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index,
- const QueryContext::generator_function_id generator_function_index)
- : output_relation_(output_relation),
+ const QueryContext::generator_function_id generator_function_index,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ output_relation_(output_relation),
output_destination_index_(output_destination_index),
generator_function_index_(generator_function_index),
started_(false) {
@@ -112,12 +114,15 @@ class TableGeneratorWorkOrder : public WorkOrder {
* @brief Constructor.
*
* @param generator_function The GeneratorFunctionHandle to use.
+ * @param query_id The ID of the query to which this operator belongs.
* @param output_destination The InsertDestination to insert the generated
* output.
**/
TableGeneratorWorkOrder(const GeneratorFunctionHandle &function_handle,
+ const std::size_t query_id,
InsertDestination *output_destination)
- : function_handle_(function_handle),
+ : WorkOrder(query_id),
+ function_handle_(function_handle),
output_destination_(DCHECK_NOTNULL(output_destination)) {}
~TableGeneratorWorkOrder() override {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 5ede6f7..8db5ef1 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -167,6 +167,7 @@ bool TextScanOperator::getAllWorkOrders(
container->addNormalWorkOrder(
new TextSplitWorkOrder(file,
process_escape_sequences_,
+ query_id_,
storage_manager,
op_index_,
scheduler_client_id,
@@ -185,6 +186,7 @@ bool TextScanOperator::getAllWorkOrders(
blob_work.size,
field_terminator_,
process_escape_sequences_,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -204,6 +206,7 @@ bool TextScanOperator::getAllWorkOrders(
new TextScanWorkOrder(file,
field_terminator_,
process_escape_sequences_,
+ query_id_,
output_destination,
storage_manager),
op_index_);
@@ -235,9 +238,11 @@ void TextScanOperator::receiveFeedbackMessage(const WorkOrder::FeedbackMessage &
TextScanWorkOrder::TextScanWorkOrder(const std::string &filename,
const char field_terminator,
const bool process_escape_sequences,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager)
- : is_file_(true),
+ : WorkOrder(query_id),
+ is_file_(true),
filename_(filename),
field_terminator_(field_terminator),
text_blob_(0),
@@ -253,9 +258,11 @@ TextScanWorkOrder::TextScanWorkOrder(const block_id text_blob,
const std::size_t text_size,
const char field_terminator,
const bool process_escape_sequences,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager)
- : is_file_(false),
+ : WorkOrder(query_id),
+ is_file_(false),
field_terminator_(field_terminator),
text_blob_(text_blob),
text_size_(text_size),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index a2d4ced..ad03f6a 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -134,14 +134,17 @@ class TextScanOperator : public RelationalOperator {
* @param output_relation The output relation.
* @param output_destination_index The index of the InsertDestination in the
* QueryContext to insert tuples.
+ * @param query_id The ID of the query to which this operator belongs.
**/
TextScanOperator(const std::string &file_pattern,
const char field_terminator,
const bool process_escape_sequences,
const bool parallelize_load,
const CatalogRelation &output_relation,
- const QueryContext::insert_destination_id output_destination_index)
- : file_pattern_(file_pattern),
+ const QueryContext::insert_destination_id output_destination_index,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ file_pattern_(file_pattern),
field_terminator_(field_terminator),
process_escape_sequences_(process_escape_sequences),
parallelize_load_(parallelize_load),
@@ -202,6 +205,7 @@ class TextScanWorkOrder : public WorkOrder {
* the text file.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
+ * @param query_id The ID of the query to which this operator belongs.
* @param output_destination The InsertDestination to insert tuples.
* @param storage_manager The StorageManager to use.
**/
@@ -209,6 +213,7 @@ class TextScanWorkOrder : public WorkOrder {
const std::string &filename,
const char field_terminator,
const bool process_escape_sequences,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager);
@@ -221,6 +226,7 @@ class TextScanWorkOrder : public WorkOrder {
* the text file.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
+ * @param query_id The ID of the query to which this operator belongs.
* @param output_destination The InsertDestination to write the read tuples.
* @param storage_manager The StorageManager to use.
*/
@@ -229,6 +235,7 @@ class TextScanWorkOrder : public WorkOrder {
const std::size_t text_size,
const char field_terminator,
const bool process_escape_sequences,
+ const std::size_t query_id,
InsertDestination *output_destination,
StorageManager *storage_manager);
@@ -318,6 +325,7 @@ class TextSplitWorkOrder : public WorkOrder {
* @param filename File to split into row-aligned blobs.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
+ * @param query_id The ID of the query to which this operator belongs.
* @param storage_manager The StorageManager to use.
* @param operator_index Operator index of the current operator. This is used
* to send new-work available message to Foreman.
@@ -326,11 +334,13 @@ class TextSplitWorkOrder : public WorkOrder {
*/
TextSplitWorkOrder(const std::string &filename,
const bool process_escape_sequences,
+ const std::size_t query_id,
StorageManager *storage_manager,
const std::size_t operator_index,
const tmb::client_id scheduler_client_id,
MessageBus *bus)
- : filename_(filename),
+ : WorkOrder(query_id),
+ filename_(filename),
process_escape_sequences_(process_escape_sequences),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
operator_index_(operator_index),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 7585db1..b331a9c 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -57,6 +57,7 @@ bool UpdateOperator::getAllWorkOrders(
input_block_id,
query_context->getPredicate(predicate_index_),
query_context->getUpdateGroup(update_group_index_),
+ query_id_,
query_context->getInsertDestination(relocation_destination_index_),
storage_manager,
op_index_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index 78f8fe0..f1de0eb 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -72,6 +72,7 @@ class UpdateOperator : public RelationalOperator {
* @param update_group_index The index of a update group (the map of
* attribute_ids to Scalars) which should be evaluated to get the new
* value for the corresponding attribute.
+ * @param query_id The ID of the query to which this operator belongs.
*
* @warning The constructed InsertDestination should belong to relation, but
* must NOT contain any pre-existing blocks.
@@ -79,8 +80,10 @@ class UpdateOperator : public RelationalOperator {
UpdateOperator(const CatalogRelation &relation,
const QueryContext::insert_destination_id relocation_destination_index,
const QueryContext::predicate_id predicate_index,
- const QueryContext::update_group_id update_group_index)
- : relation_(relation),
+ const QueryContext::update_group_id update_group_index,
+ const std::size_t query_id)
+ : RelationalOperator(query_id),
+ relation_(relation),
relocation_destination_index_(relocation_destination_index),
predicate_index_(predicate_index),
update_group_index_(update_group_index),
@@ -130,6 +133,7 @@ class UpdateWorkOrder : public WorkOrder {
* @param assignments The assignments (the map of attribute_ids to Scalars)
* which should be evaluated to get the new value for the corresponding
* attribute.
+ * @param query_id The ID of the query to which this operator belongs.
* @param input_block_id The block id.
* @param relocation_destination The InsertDestination to relocate tuples
* which can not be updated in-place.
@@ -143,12 +147,14 @@ class UpdateWorkOrder : public WorkOrder {
const block_id input_block_id,
const Predicate *predicate,
const std::unordered_map<attribute_id, std::unique_ptr<const Scalar>> &assignments,
+ const std::size_t query_id,
InsertDestination *relocation_destination,
StorageManager *storage_manager,
const std::size_t update_operator_index,
const tmb::client_id scheduler_client_id,
MessageBus *bus)
- : relation_(relation),
+ : WorkOrder(query_id),
+ relation_(relation),
input_block_id_(input_block_id),
predicate_(predicate),
assignments_(assignments),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 9828ab9..fdd694f 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -130,6 +130,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
}
return new DropTableWorkOrder(
+ proto.query_id(),
move(blocks),
storage_manager,
proto.HasExtension(serialization::DropTableWorkOrder::relation_id)
@@ -140,10 +141,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
case serialization::FINALIZE_AGGREGATION: {
LOG(INFO) << "Creating FinalizeAggregationWorkOrder";
return new FinalizeAggregationWorkOrder(
- query_context->releaseAggregationState(
- proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
+ proto.query_id(),
+ query_context->releaseAggregationState(proto.GetExtension(
+ serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
query_context->getInsertDestination(
- proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+ proto.GetExtension(serialization::FinalizeAggregationWorkOrder::
+ insert_destination_index)));
}
case serialization::HASH_JOIN: {
const auto hash_join_work_order_type =
@@ -262,6 +265,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
case serialization::INSERT: {
LOG(INFO) << "Creating InsertWorkOrder";
return new InsertWorkOrder(
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
query_context->releaseTuple(
@@ -280,6 +284,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index)),
query_context->getScalarGroup(
proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index)),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index)),
storage_manager);
@@ -292,6 +297,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::SampleWorkOrder::block_id),
proto.GetExtension(serialization::SampleWorkOrder::is_block_sample),
proto.GetExtension(serialization::SampleWorkOrder::percentage),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::SampleWorkOrder::insert_destination_index)),
storage_manager);
@@ -301,6 +307,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
return new SaveBlocksWorkOrder(
proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
proto.GetExtension(serialization::SaveBlocksWorkOrder::force),
+ proto.query_id(),
storage_manager);
}
case serialization::SELECT: {
@@ -324,6 +331,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
simple_projection ? nullptr
: &query_context->getScalarGroup(
proto.GetExtension(serialization::SelectWorkOrder::selection_index)),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)),
storage_manager);
@@ -349,6 +357,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
move(runs),
proto.GetExtension(serialization::SortMergeRunWorkOrder::top_k),
proto.GetExtension(serialization::SortMergeRunWorkOrder::merge_level),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index)),
storage_manager,
@@ -364,6 +373,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::SortRunGenerationWorkOrder::block_id),
query_context->getSortConfig(
proto.GetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index)),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index)),
storage_manager);
@@ -373,6 +383,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
return new TableGeneratorWorkOrder(
query_context->getGeneratorFunctionHandle(
proto.GetExtension(serialization::TableGeneratorWorkOrder::generator_function_index)),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)));
}
@@ -383,6 +394,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::TextScanWorkOrder::filename),
proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
storage_manager);
@@ -395,6 +407,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
text_blob_proto.size(),
proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
storage_manager);
@@ -405,6 +418,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
return new TextSplitWorkOrder(
proto.GetExtension(serialization::TextSplitWorkOrder::filename),
proto.GetExtension(serialization::TextSplitWorkOrder::process_escape_sequences),
+ proto.query_id(),
storage_manager,
proto.GetExtension(serialization::TextSplitWorkOrder::operator_index),
shiftboss_client_id,
@@ -420,6 +434,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::UpdateWorkOrder::predicate_index)),
query_context->getUpdateGroup(
proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)),
+ proto.query_id(),
query_context->getInsertDestination(
proto.GetExtension(serialization::UpdateWorkOrder::insert_destination_index)),
storage_manager,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 2c408df..ace7951 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -281,8 +281,10 @@ class AggregationOperatorTest : public ::testing::Test {
insert_destination_proto->set_relation_id(result_table_->getID());
insert_destination_proto->set_relational_op_index(kOpIndex);
- finalize_op_.reset(
- new FinalizeAggregationOperator(aggr_state_index, *result_table_, insert_destination_index));
+ finalize_op_.reset(new FinalizeAggregationOperator(aggr_state_index,
+ *result_table_,
+ insert_destination_index,
+ 0 /* dummy query ID */));
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto,
@@ -363,8 +365,10 @@ class AggregationOperatorTest : public ::testing::Test {
insert_destination_proto->set_relation_id(result_table_->getID());
insert_destination_proto->set_relational_op_index(kOpIndex);
- finalize_op_.reset(
- new FinalizeAggregationOperator(aggr_state_index, *result_table_, insert_destination_index));
+ finalize_op_.reset(new FinalizeAggregationOperator(aggr_state_index,
+ *result_table_,
+ insert_destination_index,
+ 0 /* dummy query ID */));
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index 50c508d..244091f 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -1573,7 +1573,8 @@ class SortMergeRunOperatorTest : public ::testing::Test {
sort_config_index,
merge_factor,
top_k,
- true));
+ true,
+ 0 /* dummy query ID */));
merge_op_->setOperatorIndex(kOpIndex);
// Set up the QueryContext.
@@ -1616,7 +1617,8 @@ class SortMergeRunOperatorTest : public ::testing::Test {
sort_config_index,
merge_factor,
top_k,
- false));
+ false,
+ 0 /* dummy query ID */));
merge_op_->setOperatorIndex(kOpIndex);
// Set up the QueryContext.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 7491778..6f24b92 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -358,7 +358,8 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
*result_table_,
insert_destination_index,
sort_config_index,
- true /* is_stored */));
+ true /* is_stored */,
+ 0 /* dummy query ID */));
run_gen->setOperatorIndex(kOpIndex);
// Set up the QueryContext.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1290dd70/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index 1dfad7b..7626686 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -193,7 +193,8 @@ TEST_F(TextScanOperatorTest, ScanTest) {
true,
false,
*relation_,
- output_destination_index));
+ output_destination_index,
+ 0 /* dummy query ID */));
// Setup query_context_.
query_context_.reset(new QueryContext(query_context_proto,