You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/20 17:58:11 UTC
[28/28] incubator-quickstep git commit: Parallel work order
generation support.
Parallel work order generation support.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bc524a97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bc524a97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bc524a97
Branch: refs/heads/partitioned-aggregation
Commit: bc524a97375edbfd748945c69cf9b2928c4944a1
Parents: 3b5b8a9
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Aug 19 10:35:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Sep 20 12:57:06 2016 -0500
----------------------------------------------------------------------
.../FinalizeAggregationOperator.cpp | 35 ++++++++++++++++----
.../FinalizeAggregationOperator.hpp | 10 ++++--
2 files changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc524a97/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 7e337de..55d1357 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
if (blocking_dependencies_met_ && !started_) {
started_ = true;
- container->addNormalWorkOrder(
- new FinalizeAggregationWorkOrder(
- query_id_,
- query_context->getAggregationState(aggr_state_index_),
- query_context->getInsertDestination(output_destination_index_)),
- op_index_);
+ DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr);
+ if (query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned()) {
+ // The same AggregationState is shared across all the WorkOrders.
+ for (std::size_t part_id = 0;
+ part_id < query_context->getAggregationState(aggr_state_index_)
+ ->getNumPartitions();
+ ++part_id) {
+ container->addNormalWorkOrder(
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ query_context->getAggregationState(aggr_state_index_),
+ query_context->getInsertDestination(output_destination_index_),
+ static_cast<int>(part_id)),
+ op_index_);
+ }
+ } else {
+ container->addNormalWorkOrder(
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ query_context->getAggregationState(aggr_state_index_),
+ query_context->getInsertDestination(output_destination_index_)),
+ op_index_);
+ }
}
return started_;
}
@@ -70,7 +87,11 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
void FinalizeAggregationWorkOrder::execute() {
- state_->finalizeAggregate(output_destination_);
+ if (state_->isAggregatePartitioned()) {
+ state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+ } else {
+ state_->finalizeAggregate(output_destination_);
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc524a97/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 0aeac2a..7517d58 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -119,13 +119,18 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
+ * @param part_id The partition ID for which the Finalize aggregation work
+ * order is issued. Ignore this field if the aggregation is not
+ * partitioned.
*/
FinalizeAggregationWorkOrder(const std::size_t query_id,
AggregationOperationState *state,
- InsertDestination *output_destination)
+ InsertDestination *output_destination,
+ int part_id = -1)
: WorkOrder(query_id),
state_(DCHECK_NOTNULL(state)),
- output_destination_(DCHECK_NOTNULL(output_destination)) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ part_id_(part_id) {}
~FinalizeAggregationWorkOrder() override {}
@@ -134,6 +139,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
private:
AggregationOperationState *state_;
InsertDestination *output_destination_;
+ const int part_id_;
DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
};