You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/06 20:16:45 UTC
[68/73] [abbrv] incubator-quickstep git commit: Parallel work order
generation support.
Parallel work order generation support.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4064d39e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4064d39e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4064d39e
Branch: refs/heads/partitioned-aggregation
Commit: 4064d39eb9d8f9f13062172590846dfcd372acef
Parents: 650c0b3
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Aug 19 10:35:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Sep 6 15:01:33 2016 -0500
----------------------------------------------------------------------
.../FinalizeAggregationOperator.cpp | 35 ++++++++++++++++----
.../FinalizeAggregationOperator.hpp | 12 +++++--
2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4064d39e/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 65e62c4..55d1357 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
if (blocking_dependencies_met_ && !started_) {
started_ = true;
- container->addNormalWorkOrder(
- new FinalizeAggregationWorkOrder(
- query_id_,
- query_context->releaseAggregationState(aggr_state_index_),
- query_context->getInsertDestination(output_destination_index_)),
- op_index_);
+ DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr);
+ if (query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned()) {
+ // The same AggregationState is shared across all the WorkOrders.
+ for (std::size_t part_id = 0;
+ part_id < query_context->getAggregationState(aggr_state_index_)
+ ->getNumPartitions();
+ ++part_id) {
+ container->addNormalWorkOrder(
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ query_context->getAggregationState(aggr_state_index_),
+ query_context->getInsertDestination(output_destination_index_),
+ static_cast<int>(part_id)),
+ op_index_);
+ }
+ } else {
+ container->addNormalWorkOrder(
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ query_context->getAggregationState(aggr_state_index_),
+ query_context->getInsertDestination(output_destination_index_)),
+ op_index_);
+ }
}
return started_;
}
@@ -70,7 +87,11 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
void FinalizeAggregationWorkOrder::execute() {
- state_->finalizeAggregate(output_destination_);
+ if (state_->isAggregatePartitioned()) {
+ state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+ } else {
+ state_->finalizeAggregate(output_destination_);
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4064d39e/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 7ac6712..32f10d7 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -120,21 +120,27 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
+ * @param part_id The partition ID for which the Finalize aggregation work
+ * order is issued. Ignore this field if the aggregation is not
+ * partitioned.
*/
FinalizeAggregationWorkOrder(const std::size_t query_id,
AggregationOperationState *state,
- InsertDestination *output_destination)
+ InsertDestination *output_destination,
+ int part_id = -1)
: WorkOrder(query_id),
state_(DCHECK_NOTNULL(state)),
- output_destination_(DCHECK_NOTNULL(output_destination)) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ part_id_(part_id) {}
~FinalizeAggregationWorkOrder() override {}
void execute() override;
private:
- std::unique_ptr<AggregationOperationState> state_;
+ AggregationOperationState *state_;
InsertDestination *output_destination_;
+ const int part_id_;
DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
};