You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/15 00:31:48 UTC
incubator-quickstep git commit: Added the support for partitioned
(but not parallel) aggregations.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 361a65fa6 -> e6ac59d5a
Added the support for partitioned (but not parallel) aggregations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e6ac59d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e6ac59d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e6ac59d5
Branch: refs/heads/master
Commit: e6ac59d5a966f23a4fad3d907be7cc9b4ba53820
Parents: 361a65f
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri May 26 02:08:57 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 19:00:28 2017 -0500
----------------------------------------------------------------------
query_execution/BlockLocator.hpp | 2 +
query_execution/ForemanDistributed.cpp | 14 +++-
query_execution/PolicyEnforcerDistributed.cpp | 2 +
query_execution/PolicyEnforcerDistributed.hpp | 2 +
query_execution/QueryContext.cpp | 24 +++++--
query_execution/QueryContext.hpp | 26 +++++---
query_execution/QueryContext.proto | 7 +-
query_execution/QueryManagerDistributed.cpp | 5 +-
query_execution/QueryManagerDistributed.hpp | 16 +++--
query_optimizer/ExecutionGenerator.cpp | 57 ++++++++++++----
relational_operators/AggregationOperator.cpp | 62 ++++++++++-------
relational_operators/AggregationOperator.hpp | 38 ++++++++---
.../BuildAggregationExistenceMapOperator.cpp | 64 +++++++++++-------
.../BuildAggregationExistenceMapOperator.hpp | 37 ++++++++---
relational_operators/CMakeLists.txt | 4 ++
.../DestroyAggregationStateOperator.cpp | 24 ++++---
.../DestroyAggregationStateOperator.hpp | 12 +++-
.../FinalizeAggregationOperator.cpp | 58 ++++++++++------
.../FinalizeAggregationOperator.hpp | 18 +++--
.../InitializeAggregationOperator.cpp | 44 ++++++++----
.../InitializeAggregationOperator.hpp | 15 +++--
relational_operators/WorkOrder.proto | 19 +++++-
relational_operators/WorkOrderFactory.cpp | 70 ++++++++++++++++----
.../tests/AggregationOperator_unittest.cpp | 17 +++--
24 files changed, 457 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index 82c28ae..01492f9 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -123,6 +123,8 @@ class BlockLocator : public Thread {
* @return Whether the block locality info has found.
**/
bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
+ if (block == kInvalidBlockId) { return false; }
+
const std::unordered_set<block_id_domain> block_domains = getBlockDomains(block);
if (!block_domains.empty()) {
// NOTE(zuyu): This lock is held for the rest duration of this call, as the
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 34b5b76..49f2101 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,12 +243,14 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
size_t *shiftboss_index_for_aggregation) {
const S::WorkOrder &work_order_proto = proto.work_order();
QueryContext::aggregation_state_id aggr_state_index;
+ partition_id part_id;
vector<QueryContext::lip_filter_id> lip_filter_indexes;
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+ part_id = work_order_proto.GetExtension(S::AggregationWorkOrder::partition_id);
for (int i = 0; i < work_order_proto.ExtensionSize(S::AggregationWorkOrder::lip_filter_indexes); ++i) {
lip_filter_indexes.push_back(work_order_proto.GetExtension(S::AggregationWorkOrder::lip_filter_indexes, i));
@@ -256,18 +258,28 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
break;
+ case S::BUILD_AGGREGATION_EXISTENCE_MAP:
+ aggr_state_index = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::aggr_state_index);
+ part_id = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::partition_id);
+ break;
+ case S::INITIALIZE_AGGREGATION:
+ aggr_state_index = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::aggr_state_index);
+ part_id = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::partition_id);
+ break;
case S::FINALIZE_AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
+ part_id = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::partition_id);
break;
case S::DESTROY_AGGREGATION_STATE:
aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index);
+ part_id = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::partition_id);
break;
default:
return false;
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
- proto.query_id(), aggr_state_index, lip_filter_indexes, block_locator_, block,
+ proto.query_id(), aggr_state_index, part_id, lip_filter_indexes, block_locator_, block,
next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 7e9a81d..766c351 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -192,6 +192,7 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const partition_id part_id,
const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
@@ -200,6 +201,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+ part_id,
lip_filter_indexes,
block_locator,
block,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e24f8cf..23b0017 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -133,6 +133,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
*
* @param query_id The query id.
* @param aggr_state_index The Hash Table for the Aggregation.
+ * @param part_id The partition ID.
* @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
@@ -142,6 +143,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void getShiftbossIndexForAggregation(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const partition_id part_id,
const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 8ba77ab..52aa4dc 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -64,10 +64,16 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
<< proto.DebugString();
for (int i = 0; i < proto.aggregation_states_size(); ++i) {
- aggregation_states_.emplace_back(
- AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
- database,
- storage_manager));
+ PartitionedAggregationOperationStates partitioned_aggregation_states;
+ const serialization::QueryContext::AggregationOperationStateContext &aggr_state_context_proto =
+ proto.aggregation_states(i);
+ for (std::uint64_t j = 0; j < aggr_state_context_proto.num_partitions(); ++j) {
+ partitioned_aggregation_states.emplace_back(
+ AggregationOperationState::ReconstructFromProto(aggr_state_context_proto.aggregation_state(),
+ database,
+ storage_manager));
+ }
+ aggregation_states_.push_back(move(partitioned_aggregation_states));
}
for (int i = 0; i < proto.generator_functions_size(); ++i) {
@@ -167,7 +173,7 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
const CatalogDatabaseLite &database) {
for (int i = 0; i < proto.aggregation_states_size(); ++i) {
- if (!AggregationOperationState::ProtoIsValid(proto.aggregation_states(i), database)) {
+ if (!AggregationOperationState::ProtoIsValid(proto.aggregation_states(i).aggregation_state(), database)) {
return false;
}
}
@@ -293,8 +299,12 @@ std::size_t QueryContext::getAggregationStatesMemoryBytes() const {
for (std::size_t agg_state_id = 0;
agg_state_id < aggregation_states_.size();
++agg_state_id) {
- if (aggregation_states_[agg_state_id] != nullptr) {
- memory += aggregation_states_[agg_state_id]->getMemoryConsumptionBytes();
+ for (std::size_t part_id = 0;
+ part_id < aggregation_states_[agg_state_id].size();
+ ++part_id) {
+ if (aggregation_states_[agg_state_id][part_id] != nullptr) {
+ memory += aggregation_states_[agg_state_id][part_id]->getMemoryConsumptionBytes();
+ }
}
}
return memory;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index ebc9506..7876821 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -169,38 +169,44 @@ class QueryContext {
* @brief Whether the given AggregationOperationState id is valid.
*
* @param id The AggregationOperationState id.
+ * @param part_id The partition id.
*
* @return True if valid, otherwise false.
**/
- bool isValidAggregationStateId(const aggregation_state_id id) const {
+ bool isValidAggregationStateId(const aggregation_state_id id, const partition_id part_id) const {
SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
- return id < aggregation_states_.size();
+ return id < aggregation_states_.size() &&
+ part_id < aggregation_states_[id].size();
}
/**
* @brief Get the AggregationOperationState.
*
* @param id The AggregationOperationState id in the query.
+ * @param part_id The partition id.
*
* @return The AggregationOperationState, alreadly created in the constructor.
**/
- inline AggregationOperationState* getAggregationState(const aggregation_state_id id) {
+ inline AggregationOperationState* getAggregationState(const aggregation_state_id id, const partition_id part_id) {
SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
DCHECK_LT(id, aggregation_states_.size());
- DCHECK(aggregation_states_[id]);
- return aggregation_states_[id].get();
+ DCHECK_LT(part_id, aggregation_states_[id].size());
+ DCHECK(aggregation_states_[id][part_id]);
+ return aggregation_states_[id][part_id].get();
}
/**
* @brief Destroy the given aggregation state.
*
* @param id The ID of the AggregationOperationState to destroy.
+ * @param part_id The partition id.
**/
- inline void destroyAggregationState(const aggregation_state_id id) {
+ inline void destroyAggregationState(const aggregation_state_id id, const partition_id part_id) {
SpinSharedMutexExclusiveLock<false> lock(aggregation_states_mutex_);
DCHECK_LT(id, aggregation_states_.size());
- DCHECK(aggregation_states_[id]);
- aggregation_states_[id].reset(nullptr);
+ DCHECK_LT(part_id, aggregation_states_[id].size());
+ DCHECK(aggregation_states_[id][part_id]);
+ aggregation_states_[id][part_id].reset(nullptr);
}
/**
@@ -611,10 +617,12 @@ class QueryContext {
part_id < join_hash_tables_[id].size();
}
+ // Per AggregationOperationState, the index is the partition id.
+ typedef std::vector<std::unique_ptr<AggregationOperationState>> PartitionedAggregationOperationStates;
// Per hash join, the index is the partition id.
typedef std::vector<std::unique_ptr<JoinHashTable>> PartitionedJoinHashTables;
- std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
+ std::vector<PartitionedAggregationOperationStates> aggregation_states_;
std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
std::vector<PartitionedJoinHashTables> join_hash_tables_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index 599daa7..b76374c 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -30,6 +30,11 @@ import "utility/SortConfiguration.proto";
import "utility/lip_filter/LIPFilter.proto";
message QueryContext {
+ message AggregationOperationStateContext {
+ required AggregationOperationState aggregation_state = 1;
+ optional uint64 num_partitions = 2 [default = 1];
+ }
+
message HashTableContext {
required HashTable join_hash_table = 1;
optional uint64 num_partitions = 2 [default = 1];
@@ -50,7 +55,7 @@ message QueryContext {
repeated UpdateAssignment update_assignments = 2;
}
- repeated AggregationOperationState aggregation_states = 1;
+ repeated AggregationOperationStateContext aggregation_states = 1;
repeated GeneratorFunctionHandle generator_functions = 2;
repeated HashTableContext join_hash_tables = 3;
repeated InsertDestination insert_destinations = 4;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 6c293a5..77a605e 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -76,7 +76,10 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
}
const serialization::QueryContext &query_context_proto = query_handle->getQueryContextProto();
- shiftboss_indexes_for_aggrs_.resize(query_context_proto.aggregation_states_size(), kInvalidShiftbossIndex);
+ for (int i = 0; i < query_context_proto.aggregation_states_size(); ++i) {
+ shiftboss_indexes_for_aggrs_.push_back(
+ vector<size_t>(query_context_proto.aggregation_states(i).num_partitions(), kInvalidShiftbossIndex));
+ }
for (int i = 0; i < query_context_proto.join_hash_tables_size(); ++i) {
shiftboss_indexes_for_hash_joins_.push_back(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 6d454cc..6490eb7 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -105,6 +105,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
* <next_shiftboss_index_to_schedule>.
*
* @param aggr_state_index The Hash Table for the Aggregation.
+ * @param part_id The partition ID.
* @param lip_filter_indexes The LIP filter indexes.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
@@ -112,21 +113,24 @@ class QueryManagerDistributed final : public QueryManagerBase {
* @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
**/
void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+ const partition_id part_id,
const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index) {
DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size());
- if (shiftboss_indexes_for_aggrs_[aggr_state_index] != kInvalidShiftbossIndex) {
- *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index];
+ DCHECK_LT(part_id, shiftboss_indexes_for_aggrs_[aggr_state_index].size());
+
+ if (shiftboss_indexes_for_aggrs_[aggr_state_index][part_id] != kInvalidShiftbossIndex) {
+ *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index][part_id];
return;
}
getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
shiftboss_index);
- shiftboss_indexes_for_aggrs_[aggr_state_index] = *shiftboss_index;
+ shiftboss_indexes_for_aggrs_[aggr_state_index][part_id] = *shiftboss_index;
}
/**
@@ -259,9 +263,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
- // From an aggregation id (QueryContext::aggregation_state_id) to its
- // scheduled Shiftboss index.
- std::vector<std::size_t> shiftboss_indexes_for_aggrs_;
+ // Get the scheduled Shiftboss index given
+ // [QueryContext::aggregation_state_id][partition_id].
+ std::vector<std::vector<std::size_t>> shiftboss_indexes_for_aggrs_;
// Get the scheduled Shiftboss index given
// [QueryContext::join_hash_table_id][partition_id].
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2dbcf16..3b2fe08 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1580,14 +1580,25 @@ void ExecutionGenerator::convertUpdateTable(
void ExecutionGenerator::convertAggregate(
const P::AggregatePtr &physical_plan) {
+ const CatalogRelationInfo *input_relation_info =
+ findRelationInfoOutputByPhysical(physical_plan->input());
+ const CatalogRelation *input_relation = input_relation_info->relation;
+ const PartitionScheme *input_partition_scheme = input_relation->getPartitionScheme();
+ const size_t num_partitions =
+ input_partition_scheme
+ ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
+ : 1u;
+
// Create aggr state proto.
const QueryContext::aggregation_state_id aggr_state_index =
query_context_proto_->aggregation_states_size();
- S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+ S::QueryContext::AggregationOperationStateContext *aggr_state_context_proto =
+ query_context_proto_->add_aggregation_states();
+ aggr_state_context_proto->set_num_partitions(num_partitions);
- const CatalogRelationInfo *input_relation_info =
- findRelationInfoOutputByPhysical(physical_plan->input());
- aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ S::AggregationOperationState *aggr_state_proto =
+ aggr_state_context_proto->mutable_aggregation_state();
+ aggr_state_proto->set_relation_id(input_relation->getID());
bool use_parallel_initialization = false;
@@ -1682,7 +1693,8 @@ void ExecutionGenerator::convertAggregate(
query_handle_->query_id(),
*input_relation_info->relation,
input_relation_info->isStoredRelation(),
- aggr_state_index));
+ aggr_state_index,
+ num_partitions));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(aggregation_operator_index,
@@ -1695,7 +1707,8 @@ void ExecutionGenerator::convertAggregate(
execution_plan_->addRelationalOperator(
new InitializeAggregationOperator(
query_handle_->query_id(),
- aggr_state_index));
+ aggr_state_index,
+ num_partitions));
execution_plan_->addDirectDependency(aggregation_operator_index,
initialize_aggregation_operator_index,
@@ -1715,6 +1728,7 @@ void ExecutionGenerator::convertAggregate(
execution_plan_->addRelationalOperator(
new FinalizeAggregationOperator(query_handle_->query_id(),
aggr_state_index,
+ num_partitions,
*output_relation,
insert_destination_index));
@@ -1734,7 +1748,8 @@ void ExecutionGenerator::convertAggregate(
const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
execution_plan_->addRelationalOperator(
new DestroyAggregationStateOperator(query_handle_->query_id(),
- aggr_state_index));
+ aggr_state_index,
+ num_partitions));
execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
finalize_aggregation_operator_index,
@@ -1755,13 +1770,22 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
findRelationInfoOutputByPhysical(physical_plan->left_child());
const CatalogRelationInfo *right_relation_info =
findRelationInfoOutputByPhysical(physical_plan->right_child());
+ const CatalogRelation &right_relation = *right_relation_info->relation;
+
+ // TODO(quickstep-team): Support partitioned aggregation.
+ CHECK(!right_relation.hasPartitionScheme());
+ const std::size_t num_partitions = 1u;
// Create aggr state proto.
const QueryContext::aggregation_state_id aggr_state_index =
query_context_proto_->aggregation_states_size();
- S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+ S::QueryContext::AggregationOperationStateContext *aggr_state_context_proto =
+ query_context_proto_->add_aggregation_states();
+ aggr_state_context_proto->set_num_partitions(num_partitions);
- aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+ S::AggregationOperationState *aggr_state_proto =
+ aggr_state_context_proto->mutable_aggregation_state();
+ aggr_state_proto->set_relation_id(right_relation.getID());
// Group by the right join attribute.
std::unique_ptr<const Scalar> execution_group_by_expression(
@@ -1807,7 +1831,8 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
execution_plan_->addRelationalOperator(
new InitializeAggregationOperator(
query_handle_->query_id(),
- aggr_state_index));
+ aggr_state_index,
+ num_partitions));
const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
execution_plan_->addRelationalOperator(
@@ -1816,7 +1841,8 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
*left_relation_info->relation,
physical_plan->left_join_attributes().front()->id(),
left_relation_info->isStoredRelation(),
- aggr_state_index));
+ aggr_state_index,
+ num_partitions));
if (!left_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
@@ -1828,9 +1854,10 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
execution_plan_->addRelationalOperator(
new AggregationOperator(
query_handle_->query_id(),
- *right_relation_info->relation,
+ right_relation,
right_relation_info->isStoredRelation(),
- aggr_state_index));
+ aggr_state_index,
+ num_partitions));
if (!right_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(aggregation_operator_index,
@@ -1862,6 +1889,7 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
execution_plan_->addRelationalOperator(
new FinalizeAggregationOperator(query_handle_->query_id(),
aggr_state_index,
+ num_partitions,
*output_relation,
insert_destination_index));
@@ -1881,7 +1909,8 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
execution_plan_->addRelationalOperator(
new DestroyAggregationStateOperator(query_handle_->query_id(),
- aggr_state_index));
+ aggr_state_index,
+ num_partitions));
execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
finalize_aggregation_operator_index,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index c774719..2618e01 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -21,6 +21,7 @@
#include <vector>
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
@@ -41,29 +42,35 @@ bool AggregationOperator::getAllWorkOrders(
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
if (input_relation_is_stored_) {
- if (!started_) {
- for (const block_id input_block_id : input_relation_block_ids_) {
+ if (started_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
container->addNormalWorkOrder(
new AggregationWorkOrder(
query_id_,
input_block_id,
- query_context->getAggregationState(aggr_state_index_),
+ query_context->getAggregationState(aggr_state_index_, part_id),
CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
op_index_);
}
- started_ = true;
}
- return started_;
+ started_ = true;
+ return true;
} else {
- while (num_workorders_generated_ < input_relation_block_ids_.size()) {
- container->addNormalWorkOrder(
- new AggregationWorkOrder(
- query_id_,
- input_relation_block_ids_[num_workorders_generated_],
- query_context->getAggregationState(aggr_state_index_),
- CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
- op_index_);
- ++num_workorders_generated_;
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+ container->addNormalWorkOrder(
+ new AggregationWorkOrder(
+ query_id_,
+ input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+ query_context->getAggregationState(aggr_state_index_, part_id),
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
@@ -71,31 +78,38 @@ bool AggregationOperator::getAllWorkOrders(
bool AggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (input_relation_is_stored_) {
- if (!started_) {
- for (const block_id input_block_id : input_relation_block_ids_) {
- container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+ if (started_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
+ container->addWorkOrderProto(createWorkOrderProto(input_block_id, part_id), op_index_);
}
- started_ = true;
}
+ started_ = true;
return true;
} else {
- while (num_workorders_generated_ < input_relation_block_ids_.size()) {
- container->addWorkOrderProto(
- createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
- op_index_);
- ++num_workorders_generated_;
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]], part_id),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
}
-serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_id block, const partition_id part_id) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::AGGREGATION);
proto->set_query_id(query_id_);
proto->SetExtension(serialization::AggregationWorkOrder::block_id, block);
proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
+ proto->SetExtension(serialization::AggregationWorkOrder::partition_id, part_id);
proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 93f4550..1d37e50 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -20,11 +20,13 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATOR_HPP_
+#include <cstddef>
#include <string>
#include <vector>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -66,19 +68,34 @@ class AggregationOperator : public RelationalOperator {
* is fully available to the operator before it can start generating
* workorders.
* @param aggr_state_index The index of the AggregationState in QueryContext.
+ * @param num_partitions The number of partitions in 'input_relation'. If no
+ * partitions, it is one.
**/
AggregationOperator(const std::size_t query_id,
const CatalogRelation &input_relation,
bool input_relation_is_stored,
- const QueryContext::aggregation_state_id aggr_state_index)
+ const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t num_partitions)
: RelationalOperator(query_id),
input_relation_(input_relation),
input_relation_is_stored_(input_relation_is_stored),
- input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
- : std::vector<block_id>()),
aggr_state_index_(aggr_state_index),
- num_workorders_generated_(0),
- started_(false) {}
+ num_partitions_(num_partitions),
+ input_relation_block_ids_(num_partitions),
+ num_workorders_generated_(num_partitions),
+ started_(false) {
+ if (input_relation_is_stored) {
+ if (input_relation.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+ }
+ } else {
+ // No partition.
+ input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
+ }
+ }
+ }
~AggregationOperator() override {}
@@ -104,7 +121,7 @@ class AggregationOperator : public RelationalOperator {
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
const partition_id part_id) override {
- input_relation_block_ids_.push_back(input_block_id);
+ input_relation_block_ids_[part_id].push_back(input_block_id);
}
private:
@@ -112,15 +129,18 @@ class AggregationOperator : public RelationalOperator {
* @brief Create Work Order proto.
*
* @param block The block id used in the Work Order.
+ * @param part_id The partition id of 'block'.
**/
- serialization::WorkOrder* createWorkOrderProto(const block_id block);
+ serialization::WorkOrder* createWorkOrderProto(const block_id block, const partition_id part_id);
const CatalogRelation &input_relation_;
const bool input_relation_is_stored_;
- std::vector<block_id> input_relation_block_ids_;
const QueryContext::aggregation_state_id aggr_state_index_;
+ const std::size_t num_partitions_;
- std::vector<block_id>::size_type num_workorders_generated_;
+ // The index is the partition id.
+ std::vector<BlocksInPartition> input_relation_block_ids_;
+ std::vector<std::size_t> num_workorders_generated_;
bool started_;
DISALLOW_COPY_AND_ASSIGN(AggregationOperator);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
index ff65265..5552b75 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.cpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -88,33 +88,39 @@ bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
if (input_relation_is_stored_) {
- if (!started_) {
- for (const block_id input_block_id : input_relation_block_ids_) {
+ if (started_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
container->addNormalWorkOrder(
new BuildAggregationExistenceMapWorkOrder(
query_id_,
input_relation_,
input_block_id,
build_attribute_,
- query_context->getAggregationState(aggr_state_index_),
+ query_context->getAggregationState(aggr_state_index_, part_id),
storage_manager),
op_index_);
}
- started_ = true;
}
+ started_ = true;
return true;
} else {
- while (num_workorders_generated_ < input_relation_block_ids_.size()) {
- container->addNormalWorkOrder(
- new BuildAggregationExistenceMapWorkOrder(
- query_id_,
- input_relation_,
- input_relation_block_ids_[num_workorders_generated_],
- build_attribute_,
- query_context->getAggregationState(aggr_state_index_),
- storage_manager),
- op_index_);
- ++num_workorders_generated_;
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+ container->addNormalWorkOrder(
+ new BuildAggregationExistenceMapWorkOrder(
+ query_id_,
+ input_relation_,
+ input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+ build_attribute_,
+ query_context->getAggregationState(aggr_state_index_, part_id),
+ storage_manager),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
@@ -123,26 +129,32 @@ bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
bool BuildAggregationExistenceMapOperator
::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (input_relation_is_stored_) {
- if (!started_) {
- for (const block_id block : input_relation_block_ids_) {
- container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+ if (started_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id block : input_relation_block_ids_[part_id]) {
+ container->addWorkOrderProto(createWorkOrderProto(block, part_id), op_index_);
}
- started_ = true;
}
+ started_ = true;
return true;
} else {
- while (num_workorders_generated_ < input_relation_block_ids_.size()) {
- container->addWorkOrderProto(
- createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
- op_index_);
- ++num_workorders_generated_;
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]], part_id),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
}
serialization::WorkOrder* BuildAggregationExistenceMapOperator
- ::createWorkOrderProto(const block_id block) {
+ ::createWorkOrderProto(const block_id block, const partition_id part_id) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::BUILD_AGGREGATION_EXISTENCE_MAP);
proto->set_query_id(query_id_);
@@ -155,6 +167,8 @@ serialization::WorkOrder* BuildAggregationExistenceMapOperator
build_attribute_);
proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index,
aggr_state_index_);
+ proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id,
+ part_id);
return proto;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
index b28b0b4..b29ad4a 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.hpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -21,12 +21,12 @@
#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
#include <cstddef>
-
#include <string>
#include <vector>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -70,21 +70,36 @@ class BuildAggregationExistenceMapOperator : public RelationalOperator {
* is fully available to the operator before it can start generating
* workorders.
* @param aggr_state_index The index of the AggregationState in QueryContext.
+ * @param num_partitions The number of partitions in 'input_relation'. If no
+ * partitions, it is one.
**/
BuildAggregationExistenceMapOperator(const std::size_t query_id,
const CatalogRelation &input_relation,
const attribute_id build_attribute,
const bool input_relation_is_stored,
- const QueryContext::aggregation_state_id aggr_state_index)
+ const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t num_partitions)
: RelationalOperator(query_id),
input_relation_(input_relation),
build_attribute_(build_attribute),
input_relation_is_stored_(input_relation_is_stored),
aggr_state_index_(aggr_state_index),
- input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
- : std::vector<block_id>()),
- num_workorders_generated_(0),
- started_(false) {}
+ num_partitions_(num_partitions),
+ input_relation_block_ids_(num_partitions),
+ num_workorders_generated_(num_partitions),
+ started_(false) {
+ if (input_relation_is_stored) {
+ if (input_relation.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+ }
+ } else {
+ // No partition.
+ input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
+ }
+ }
+ }
~BuildAggregationExistenceMapOperator() override {}
@@ -113,19 +128,21 @@ class BuildAggregationExistenceMapOperator : public RelationalOperator {
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
const partition_id part_id) override {
- input_relation_block_ids_.push_back(input_block_id);
+ input_relation_block_ids_[part_id].push_back(input_block_id);
}
private:
- serialization::WorkOrder* createWorkOrderProto(const block_id block);
+ serialization::WorkOrder* createWorkOrderProto(const block_id block, const partition_id part_id);
const CatalogRelation &input_relation_;
const attribute_id build_attribute_;
const bool input_relation_is_stored_;
const QueryContext::aggregation_state_id aggr_state_index_;
+ const std::size_t num_partitions_;
- std::vector<block_id> input_relation_block_ids_;
- std::vector<block_id>::size_type num_workorders_generated_;
+ // The index is the partition id.
+ std::vector<BlocksInPartition> input_relation_block_ids_;
+ std::vector<std::size_t> num_workorders_generated_;
bool started_;
DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 5b3f009..5ad9c3b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -87,6 +87,7 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
glog
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryContext
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_queryexecution_WorkOrdersContainer
@@ -105,6 +106,7 @@ target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMap
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryContext
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_queryexecution_WorkOrdersContainer
@@ -205,6 +207,7 @@ target_link_libraries(quickstep_relationaloperators_DeleteOperator
tmb)
target_link_libraries(quickstep_relationaloperators_DestroyAggregationStateOperator
glog
+ quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_queryexecution_WorkOrdersContainer
@@ -581,6 +584,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_HashJoinOperator
+ quickstep_relationaloperators_InitializeAggregationOperator
quickstep_relationaloperators_InsertOperator
quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_relationaloperators_SampleOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 62ca9e7..013bf18 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -19,6 +19,7 @@
#include "relational_operators/DestroyAggregationStateOperator.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
@@ -36,9 +37,11 @@ bool DestroyAggregationStateOperator::getAllWorkOrders(
tmb::MessageBus *bus) {
if (blocking_dependencies_met_ && !work_generated_) {
work_generated_ = true;
- container->addNormalWorkOrder(
- new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, query_context),
- op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ container->addNormalWorkOrder(
+ new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, part_id, query_context),
+ op_index_);
+ }
}
return work_generated_;
}
@@ -47,18 +50,21 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
if (blocking_dependencies_met_ && !work_generated_) {
work_generated_ = true;
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
+ proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id, part_id);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
+ }
}
return work_generated_;
}
void DestroyAggregationStateWorkOrder::execute() {
- query_context_->destroyAggregationState(aggr_state_index_);
+ query_context_->destroyAggregationState(aggr_state_index_, part_id_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/DestroyAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp
index 70ab45c..990160f 100644
--- a/relational_operators/DestroyAggregationStateOperator.hpp
+++ b/relational_operators/DestroyAggregationStateOperator.hpp
@@ -22,6 +22,7 @@
#include <string>
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -53,12 +54,16 @@ class DestroyAggregationStateOperator : public RelationalOperator {
*
* @param query_id The ID of the query to which this operator belongs.
* @param aggr_state_index The index of the AggregationState in QueryContext.
+ * @param num_partitions The number of partitions of 'input_relation' in a
+ * partitioned aggregation. If no partitions, it is one.
**/
DestroyAggregationStateOperator(
const std::size_t query_id,
- const QueryContext::aggregation_state_id aggr_state_index)
+ const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t num_partitions)
: RelationalOperator(query_id),
aggr_state_index_(aggr_state_index),
+ num_partitions_(num_partitions),
work_generated_(false) {}
~DestroyAggregationStateOperator() override {}
@@ -81,6 +86,7 @@ class DestroyAggregationStateOperator : public RelationalOperator {
private:
const QueryContext::aggregation_state_id aggr_state_index_;
+ const std::size_t num_partitions_;
bool work_generated_;
DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator);
@@ -96,14 +102,17 @@ class DestroyAggregationStateWorkOrder : public WorkOrder {
*
* @param query_id The ID of the query to which this WorkOrder belongs.
* @param aggr_state_index The index of the AggregationState in QueryContext.
+ * @param part_id The partition id.
* @param query_context The QueryContext to use.
**/
DestroyAggregationStateWorkOrder(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const partition_id part_id,
QueryContext *query_context)
: WorkOrder(query_id),
aggr_state_index_(aggr_state_index),
+ part_id_(part_id),
query_context_(DCHECK_NOTNULL(query_context)) {}
~DestroyAggregationStateWorkOrder() override {}
@@ -112,6 +121,7 @@ class DestroyAggregationStateWorkOrder : public WorkOrder {
private:
const QueryContext::aggregation_state_id aggr_state_index_;
+ const partition_id part_id_;
QueryContext *query_context_;
DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateWorkOrder);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 77b4b97..14db825 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -19,6 +19,9 @@
#include "relational_operators/FinalizeAggregationOperator.hpp"
+#include <cstddef>
+
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
@@ -41,19 +44,23 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
if (blocking_dependencies_met_ && !started_) {
started_ = true;
- AggregationOperationState *agg_state =
- query_context->getAggregationState(aggr_state_index_);
- DCHECK(agg_state != nullptr);
- for (std::size_t part_id = 0;
- part_id < agg_state->getNumFinalizationPartitions();
- ++part_id) {
- container->addNormalWorkOrder(
- new FinalizeAggregationWorkOrder(
- query_id_,
- part_id,
- agg_state,
- query_context->getInsertDestination(output_destination_index_)),
- op_index_);
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_, part_id);
+ DCHECK(agg_state != nullptr);
+ for (std::size_t state_part_id = 0;
+ state_part_id < agg_state->getNumFinalizationPartitions();
+ ++state_part_id) {
+ container->addNormalWorkOrder(
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ part_id,
+ state_part_id,
+ agg_state,
+ query_context->getInsertDestination(output_destination_index_)),
+ op_index_);
+ }
}
}
return started_;
@@ -66,21 +73,28 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
if (blocking_dependencies_met_ && !started_) {
started_ = true;
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
- aggr_state_index_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
- output_destination_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+ aggr_state_index_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+ part_id);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+ 0u);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+ output_destination_index_);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
+ }
}
return started_;
}
void FinalizeAggregationWorkOrder::execute() {
- state_->finalizeAggregate(partition_id_, output_destination_);
+ (void) part_id_;
+ state_->finalizeAggregate(state_partition_id_, output_destination_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 87533af..5210de2 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -59,6 +59,8 @@ class FinalizeAggregationOperator : public RelationalOperator {
*
* @param query_id The ID of the query to which this operator belongs.
* @param aggr_state_index The index of the AggregationState in QueryContext.
+ * @param num_partitions The number of partitions of 'input_relation' in a
+ * partitioned aggregation. If no partitions, it is one.
* @param output_relation The output relation.
* @param output_destination_index The index of the InsertDestination in the
* QueryContext to insert aggregation results.
@@ -66,10 +68,12 @@ class FinalizeAggregationOperator : public RelationalOperator {
FinalizeAggregationOperator(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t num_partitions,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index)
: RelationalOperator(query_id),
aggr_state_index_(aggr_state_index),
+ num_partitions_(num_partitions),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
started_(false) {}
@@ -102,6 +106,7 @@ class FinalizeAggregationOperator : public RelationalOperator {
private:
const QueryContext::aggregation_state_id aggr_state_index_;
+ const std::size_t num_partitions_;
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
bool started_;
@@ -120,18 +125,21 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
* @note InsertWorkOrder takes ownership of \c state.
*
* @param query_id The ID of the query to which this operator belongs.
- * @param partition_id The partition ID for which the Finalize aggregation
- * work order is issued.
+ * @param part_id The partition ID used by 'output_destination'.
+ * @param state_partition_id The partition ID for which the Finalize
+ * aggregation work order is issued.
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
*/
FinalizeAggregationWorkOrder(const std::size_t query_id,
- const std::size_t partition_id,
+ const std::size_t part_id,
+ const std::size_t state_partition_id,
AggregationOperationState *state,
InsertDestination *output_destination)
: WorkOrder(query_id),
- partition_id_(partition_id),
+ part_id_(part_id),
+ state_partition_id_(state_partition_id),
state_(DCHECK_NOTNULL(state)),
output_destination_(DCHECK_NOTNULL(output_destination)) {}
@@ -140,7 +148,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
void execute() override;
private:
- const std::size_t partition_id_;
+ const std::size_t part_id_, state_partition_id_;
AggregationOperationState *state_;
InsertDestination *output_destination_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index b1063ad..e197b08 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -40,18 +40,20 @@ bool InitializeAggregationOperator::getAllWorkOrders(
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
if (!started_) {
- AggregationOperationState *agg_state =
- query_context->getAggregationState(aggr_state_index_);
- DCHECK(agg_state != nullptr);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_, part_id);
+ DCHECK(agg_state != nullptr);
- for (std::size_t part_id = 0;
- part_id < agg_state->getNumInitializationPartitions();
- ++part_id) {
- container->addNormalWorkOrder(
- new InitializeAggregationWorkOrder(query_id_,
- part_id,
- agg_state),
- op_index_);
+ for (std::size_t state_part_id = 0;
+ state_part_id < agg_state->getNumInitializationPartitions();
+ ++state_part_id) {
+ container->addNormalWorkOrder(
+ new InitializeAggregationWorkOrder(query_id_,
+ state_part_id,
+ agg_state),
+ op_index_);
+ }
}
started_ = true;
}
@@ -63,10 +65,28 @@ bool InitializeAggregationOperator::getAllWorkOrders(
// initialization with the distributed version.
bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
LOG(FATAL) << "Not supported";
+
+ if (started_) {
+ return true;
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, 0u);
+
+ container->addWorkOrderProto(proto, op_index_);
+ }
+ started_ = true;
+ return true;
}
void InitializeAggregationWorkOrder::execute() {
- state_->initialize(partition_id_);
+ state_->initialize(state_partition_id_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/InitializeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index e81264a..0a9d25d 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -56,11 +56,15 @@ class InitializeAggregationOperator : public RelationalOperator {
*
* @param query_id The ID of this query.
* @param aggr_state_index The index of the AggregationOperationState in QueryContext.
+ * @param num_partitions The number of partitions in 'input_relation'. If no
+ * partitions, it is one.
**/
InitializeAggregationOperator(const std::size_t query_id,
- const QueryContext::aggregation_state_id aggr_state_index)
+ const QueryContext::aggregation_state_id aggr_state_index,
+ const std::size_t num_partitions)
: RelationalOperator(query_id),
aggr_state_index_(aggr_state_index),
+ num_partitions_(num_partitions),
started_(false) {}
~InitializeAggregationOperator() override {}
@@ -83,6 +87,7 @@ class InitializeAggregationOperator : public RelationalOperator {
private:
const QueryContext::aggregation_state_id aggr_state_index_;
+ const std::size_t num_partitions_;
bool started_;
DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
@@ -97,14 +102,14 @@ class InitializeAggregationWorkOrder : public WorkOrder {
* @brief Constructor.
*
* @param query_id The ID of the query to which this operator belongs.
- * @param partition_id The partition ID for which the work order is issued.
+ * @param state_partition_id The partition ID for which the work order is issued.
* @param state The AggregationOperationState to be initialized.
*/
InitializeAggregationWorkOrder(const std::size_t query_id,
- const std::size_t partition_id,
+ const std::size_t state_partition_id,
AggregationOperationState *state)
: WorkOrder(query_id),
- partition_id_(partition_id),
+ state_partition_id_(state_partition_id),
state_(DCHECK_NOTNULL(state)) {}
~InitializeAggregationWorkOrder() override {}
@@ -112,7 +117,7 @@ class InitializeAggregationWorkOrder : public WorkOrder {
void execute() override;
private:
- const std::size_t partition_id_;
+ const std::size_t state_partition_id_;
AggregationOperationState *state_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 99b4507..18f0589 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -21,7 +21,7 @@ package quickstep.serialization;
import "relational_operators/SortMergeRunOperator.proto";
-// Next tag: 25.
+// Next tag: 26.
enum WorkOrderType {
AGGREGATION = 1;
BUILD_AGGREGATION_EXISTENCE_MAP = 23;
@@ -35,6 +35,7 @@ enum WorkOrderType {
DROP_TABLE = 8;
FINALIZE_AGGREGATION = 9;
HASH_JOIN = 10;
+ INITIALIZE_AGGREGATION = 25;
INSERT = 11;
NESTED_LOOP_JOIN = 12;
SAMPLE = 13;
@@ -58,10 +59,12 @@ message WorkOrder {
extensions 16 to max;
}
+// Next tag: 21.
message AggregationWorkOrder {
extend WorkOrder {
// All required.
optional uint32 aggr_state_index = 16;
+ optional uint64 partition_id = 20;
optional fixed64 block_id = 17;
optional int32 lip_deployment_index = 18;
repeated uint32 lip_filter_indexes = 19;
@@ -74,6 +77,7 @@ message BuildAggregationExistenceMapWorkOrder {
optional fixed64 build_block_id = 369;
optional int32 build_attribute = 370;
optional uint32 aggr_state_index = 371;
+ optional uint64 partition_id = 372;
}
}
@@ -118,6 +122,7 @@ message DeleteWorkOrder {
message DestroyAggregationStateWorkOrder {
extend WorkOrder {
optional uint32 aggr_state_index = 352;
+ optional uint64 partition_id = 353;
}
}
@@ -138,10 +143,13 @@ message DropTableWorkOrder {
}
}
+// Next tag: 148.
message FinalizeAggregationWorkOrder {
extend WorkOrder {
// All required.
optional uint32 aggr_state_index = 144;
+ optional uint64 partition_id = 146;
+ optional uint64 state_partition_id = 147;
optional int32 insert_destination_index = 145;
}
}
@@ -178,6 +186,15 @@ message HashJoinWorkOrder {
}
}
+message InitializeAggregationWorkOrder {
+ extend WorkOrder {
+ // All required.
+ optional uint32 aggr_state_index = 400;
+ optional uint64 partition_id = 401;
+ optional uint64 state_partition_id = 402;
+ }
+}
+
message InsertWorkOrder {
extend WorkOrder {
// All required.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 4c1ffa9..48bf956 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -38,6 +38,7 @@
#include "relational_operators/DropTableOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
#include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationOperator.hpp"
#include "relational_operators/InsertOperator.hpp"
#include "relational_operators/NestedLoopsJoinOperator.hpp"
#include "relational_operators/SampleOperator.hpp"
@@ -88,18 +89,24 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
switch (proto.work_order_type()) {
case serialization::AGGREGATION: {
- LOG(INFO) << "Creating AggregationWorkOrder for Query " << query_id
+ const partition_id part_id =
+ proto.GetExtension(serialization::AggregationWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating AggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
<< " in Shiftboss " << shiftboss_index;
return new AggregationWorkOrder(
query_id,
proto.GetExtension(serialization::AggregationWorkOrder::block_id),
query_context->getAggregationState(
- proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
+ proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index), part_id),
CreateLIPFilterAdaptiveProberHelper(
proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
}
case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
- LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " << query_id
+ const partition_id part_id =
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder (Partition " << part_id << ") for Query " << query_id
<< " in Shiftboss " << shiftboss_index;
return new BuildAggregationExistenceMapWorkOrder(
@@ -109,7 +116,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
query_context->getAggregationState(
- proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index)),
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index), part_id),
storage_manager);
}
case serialization::BUILD_LIP_FILTER: {
@@ -171,12 +178,16 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
bus);
}
case serialization::DESTROY_AGGREGATION_STATE: {
- LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " << query_id
+ const partition_id part_id =
+ proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating DestroyAggregationStateWorkOrder (Partition " << part_id << ") for Query " << query_id
<< " in Shiftboss " << shiftboss_index;
return new DestroyAggregationStateWorkOrder(
query_id,
proto.GetExtension(
serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
+ part_id,
query_context);
}
case serialization::DESTROY_HASH: {
@@ -210,15 +221,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
catalog_database);
}
case serialization::FINALIZE_AGGREGATION: {
- LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << query_id
+ const partition_id part_id =
+ proto.GetExtension(serialization::FinalizeAggregationWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating FinalizeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
<< " in Shiftboss " << shiftboss_index;
// TODO(quickstep-team): Handle inner-table partitioning in the distributed
// setting.
return new FinalizeAggregationWorkOrder(
query_id,
- 0uL /* partition_id */,
+ part_id,
+ proto.GetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id),
query_context->getAggregationState(proto.GetExtension(
- serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
+ serialization::FinalizeAggregationWorkOrder::aggr_state_index), part_id),
query_context->getInsertDestination(
proto.GetExtension(serialization::FinalizeAggregationWorkOrder::
insert_destination_index)));
@@ -354,6 +369,20 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto";
}
}
+ case serialization::INITIALIZE_AGGREGATION: {
+ const partition_id part_id =
+ proto.GetExtension(serialization::InitializeAggregationWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating InitializeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
+ << " in Shiftboss " << shiftboss_index;
+ AggregationOperationState *aggr_state =
+ query_context->getAggregationState(
+ proto.GetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index), part_id);
+ return new InitializeAggregationWorkOrder(query_id,
+ proto.GetExtension(
+ serialization::InitializeAggregationWorkOrder::state_partition_id),
+ aggr_state);
+ }
case serialization::INSERT: {
LOG(INFO) << "Creating InsertWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
return new InsertWorkOrder(
@@ -578,8 +607,10 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
return proto.HasExtension(serialization::AggregationWorkOrder::block_id) &&
proto.HasExtension(serialization::AggregationWorkOrder::aggr_state_index) &&
+ proto.HasExtension(serialization::AggregationWorkOrder::partition_id) &&
query_context.isValidAggregationStateId(
- proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index));
+ proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index),
+ proto.GetExtension(serialization::AggregationWorkOrder::partition_id));
}
case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
if (!proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)) {
@@ -601,8 +632,10 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
return proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id) &&
proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index) &&
+ proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id) &&
query_context.isValidAggregationStateId(
- proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index));
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index),
+ proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id));
}
case serialization::BUILD_HASH: {
if (!proto.HasExtension(serialization::BuildHashWorkOrder::relation_id)) {
@@ -680,8 +713,10 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
}
case serialization::DESTROY_AGGREGATION_STATE: {
return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
+ proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::partition_id) &&
query_context.isValidAggregationStateId(
- proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index));
+ proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
+ proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id));
}
case serialization::DESTROY_HASH: {
return proto.HasExtension(serialization::DestroyHashWorkOrder::join_hash_table_index) &&
@@ -695,8 +730,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
}
case serialization::FINALIZE_AGGREGATION: {
return proto.HasExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index) &&
+ proto.HasExtension(serialization::FinalizeAggregationWorkOrder::partition_id) &&
query_context.isValidAggregationStateId(
- proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index)) &&
+ proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index),
+ proto.GetExtension(serialization::FinalizeAggregationWorkOrder::partition_id)) &&
+ proto.HasExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id) &&
proto.HasExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index) &&
query_context.isValidInsertDestinationId(
proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index));
@@ -775,6 +813,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) &&
proto.HasExtension(serialization::HashJoinWorkOrder::block_id);
}
+ case serialization::INITIALIZE_AGGREGATION: {
+ return proto.HasExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index) &&
+ proto.HasExtension(serialization::InitializeAggregationWorkOrder::partition_id) &&
+ query_context.isValidAggregationStateId(
+ proto.GetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index),
+ proto.GetExtension(serialization::InitializeAggregationWorkOrder::partition_id)) &&
+ proto.HasExtension(serialization::InitializeAggregationWorkOrder::state_partition_id);
+ }
case serialization::INSERT: {
return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
query_context.isValidInsertDestinationId(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 6881dea..0690b6b 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -82,6 +82,7 @@ namespace quickstep {
namespace {
constexpr std::size_t kQueryId = 0;
constexpr int kOpIndex = 0;
+constexpr std::size_t kNumPartitions = 1u;
} // namespace
class Type;
@@ -234,7 +235,8 @@ class AggregationOperatorTest : public ::testing::Test {
query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
- serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
+ serialization::AggregationOperationState *aggr_state_proto =
+ query_context_proto.add_aggregation_states()->mutable_aggregation_state();
aggr_state_proto->set_relation_id(table_->getID());
// Add an aggregate.
@@ -276,7 +278,7 @@ class AggregationOperatorTest : public ::testing::Test {
aggr_state_proto->set_estimated_num_entries(estimated_entries);
// Create Operators.
- op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index));
+ op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index, kNumPartitions));
// Setup the InsertDestination proto in the query context proto.
const QueryContext::insert_destination_id insert_destination_index =
@@ -290,11 +292,12 @@ class AggregationOperatorTest : public ::testing::Test {
finalize_op_.reset(
new FinalizeAggregationOperator(kQueryId,
aggr_state_index,
+ kNumPartitions,
*result_table_,
insert_destination_index));
destroy_aggr_state_op_.reset(
- new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+ new DestroyAggregationStateOperator(kQueryId, aggr_state_index, kNumPartitions));
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto,
@@ -331,7 +334,8 @@ class AggregationOperatorTest : public ::testing::Test {
query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
- serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
+ serialization::AggregationOperationState *aggr_state_proto =
+ query_context_proto.add_aggregation_states()->mutable_aggregation_state();
aggr_state_proto->set_relation_id(table_->getID());
// Add an aggregate.
@@ -368,7 +372,7 @@ class AggregationOperatorTest : public ::testing::Test {
serialization::HashTableImplType::SEPARATE_CHAINING);
// Create Operators.
- op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index));
+ op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index, kNumPartitions));
// Setup the InsertDestination proto in the query context proto.
const QueryContext::insert_destination_id insert_destination_index =
@@ -382,11 +386,12 @@ class AggregationOperatorTest : public ::testing::Test {
finalize_op_.reset(
new FinalizeAggregationOperator(kQueryId,
aggr_state_index,
+ kNumPartitions,
*result_table_,
insert_destination_index));
destroy_aggr_state_op_.reset(
- new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+ new DestroyAggregationStateOperator(kQueryId, aggr_state_index, kNumPartitions));
// Set up the QueryContext.
query_context_.reset(new QueryContext(query_context_proto,