You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/06 20:16:45 UTC

[68/73] [abbrv] incubator-quickstep git commit: Parallel work order generation support.

Parallel work order generation support.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4064d39e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4064d39e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4064d39e

Branch: refs/heads/partitioned-aggregation
Commit: 4064d39eb9d8f9f13062172590846dfcd372acef
Parents: 650c0b3
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Aug 19 10:35:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Sep 6 15:01:33 2016 -0500

----------------------------------------------------------------------
 .../FinalizeAggregationOperator.cpp             | 35 ++++++++++++++++----
 .../FinalizeAggregationOperator.hpp             | 12 +++++--
 2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4064d39e/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 65e62c4..55d1357 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
-    container->addNormalWorkOrder(
-        new FinalizeAggregationWorkOrder(
-            query_id_,
-            query_context->releaseAggregationState(aggr_state_index_),
-            query_context->getInsertDestination(output_destination_index_)),
-        op_index_);
+    DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr);
+    if (query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned()) {
+      // The same AggregationState is shared across all the WorkOrders.
+      for (std::size_t part_id = 0;
+           part_id < query_context->getAggregationState(aggr_state_index_)
+                         ->getNumPartitions();
+           ++part_id) {
+        container->addNormalWorkOrder(
+            new FinalizeAggregationWorkOrder(
+                query_id_,
+                query_context->getAggregationState(aggr_state_index_),
+                query_context->getInsertDestination(output_destination_index_),
+                static_cast<int>(part_id)),
+            op_index_);
+      }
+    } else {
+      container->addNormalWorkOrder(
+          new FinalizeAggregationWorkOrder(
+              query_id_,
+              query_context->getAggregationState(aggr_state_index_),
+              query_context->getInsertDestination(output_destination_index_)),
+          op_index_);
+    }
   }
   return started_;
 }
@@ -70,7 +87,11 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 
 
 void FinalizeAggregationWorkOrder::execute() {
-  state_->finalizeAggregate(output_destination_);
+  if (state_->isAggregatePartitioned()) {
+    state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+  } else {
+    state_->finalizeAggregate(output_destination_);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4064d39e/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 7ac6712..32f10d7 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -120,21 +120,27 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
+   * @param part_id The partition ID for which the Finalize aggregation work
+   *        order is issued. Ignore this field if the aggregation is not
+   *        partitioned.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
                                AggregationOperationState *state,
-                               InsertDestination *output_destination)
+                               InsertDestination *output_destination,
+                               int part_id = -1)
       : WorkOrder(query_id),
         state_(DCHECK_NOTNULL(state)),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        part_id_(part_id) {}
 
   ~FinalizeAggregationWorkOrder() override {}
 
   void execute() override;
 
  private:
-  std::unique_ptr<AggregationOperationState> state_;
+  AggregationOperationState *state_;
   InsertDestination *output_destination_;
+  const int part_id_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
 };