You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/11/17 19:40:58 UTC

[18/27] incubator-quickstep git commit: Added Vector Aggregation support in the distributed version.

Added Vector Aggregation support in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e79b520e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e79b520e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e79b520e

Branch: refs/heads/trace
Commit: e79b520ec919fbe101ad72978c02216eeeeb6ca6
Parents: 8f094a1
Author: Zuyu Zhang <zu...@cs.wisc.edu>
Authored: Fri Aug 4 17:03:34 2017 -0500
Committer: Zuyu Zhang <zu...@cs.wisc.edu>
Committed: Thu Oct 12 11:44:44 2017 -0500

----------------------------------------------------------------------
 .../FinalizeAggregationOperator.cpp             | 31 ++++++++++----------
 .../InitializeAggregationOperator.cpp           | 23 +++++++--------
 relational_operators/WorkOrderFactory.cpp       |  2 --
 3 files changed, 27 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 68d0ef4..92fc7f6 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -67,28 +67,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
   return true;
 }
 
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// finalization with the distributed version.
 bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (started_) {
     return true;
   }
 
   for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
-    proto->set_query_id(query_id_);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
-                        aggr_state_index_);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
-                        part_id);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
-                        0u);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
-                        output_destination_index_);
+    for (std::size_t state_part_id = 0;
+         state_part_id < aggr_state_num_partitions_;
+         ++state_part_id) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+      proto->set_query_id(query_id_);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+                          aggr_state_index_);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+                          part_id);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+                          state_part_id);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+                          output_destination_index_);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
+    }
   }
 
   started_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index 39a6fb4..89dfd7e 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -64,26 +64,25 @@ bool InitializeAggregationOperator::getAllWorkOrders(
   return true;
 }
 
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// initialization with the distributed version.
 bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  LOG(FATAL) << "Not supported";
-
   if (started_) {
     return true;
   }
 
   for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
-    proto->set_query_id(query_id_);
+    for (std::size_t state_part_id = 0;
+         state_part_id < aggr_state_num_init_partitions_;
+         ++state_part_id) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
+      proto->set_query_id(query_id_);
 
-    proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
-    proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
-    proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, 0u);
+      proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
+      proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
+      proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, state_part_id);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
+    }
   }
   started_ = true;
   return true;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 25cc81a..3a991bd 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -237,8 +237,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
 
       LOG(INFO) << "Creating FinalizeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
-      // TODO(quickstep-team): Handle inner-table partitioning in the distributed
-      // setting.
       return new FinalizeAggregationWorkOrder(
           query_id,
           part_id,