You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/10/12 20:15:01 UTC
incubator-quickstep git commit: Added Vector Aggregation support in
the distributed version.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 8f094a1c0 -> e79b520ec
Added Vector Aggregation support in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e79b520e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e79b520e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e79b520e
Branch: refs/heads/master
Commit: e79b520ec919fbe101ad72978c02216eeeeb6ca6
Parents: 8f094a1
Author: Zuyu Zhang <zu...@cs.wisc.edu>
Authored: Fri Aug 4 17:03:34 2017 -0500
Committer: Zuyu Zhang <zu...@cs.wisc.edu>
Committed: Thu Oct 12 11:44:44 2017 -0500
----------------------------------------------------------------------
.../FinalizeAggregationOperator.cpp | 31 ++++++++++----------
.../InitializeAggregationOperator.cpp | 23 +++++++--------
relational_operators/WorkOrderFactory.cpp | 2 --
3 files changed, 27 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 68d0ef4..92fc7f6 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -67,28 +67,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
return true;
}
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// finalization with the distributed version.
bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (started_) {
return true;
}
for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
- aggr_state_index_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
- part_id);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
- 0u);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
- output_destination_index_);
+ for (std::size_t state_part_id = 0;
+ state_part_id < aggr_state_num_partitions_;
+ ++state_part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+ aggr_state_index_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+ part_id);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+ state_part_id);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+ output_destination_index_);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
+ }
}
started_ = true;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index 39a6fb4..89dfd7e 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -64,26 +64,25 @@ bool InitializeAggregationOperator::getAllWorkOrders(
return true;
}
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// initialization with the distributed version.
bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- LOG(FATAL) << "Not supported";
-
if (started_) {
return true;
}
for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
- proto->set_query_id(query_id_);
+ for (std::size_t state_part_id = 0;
+ state_part_id < aggr_state_num_init_partitions_;
+ ++state_part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
+ proto->set_query_id(query_id_);
- proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
- proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
- proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, 0u);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, state_part_id);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
+ }
}
started_ = true;
return true;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 25cc81a..3a991bd 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -237,8 +237,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
LOG(INFO) << "Creating FinalizeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
<< " in Shiftboss " << shiftboss_index;
- // TODO(quickstep-team): Handle inner-table partitioning in the distributed
- // setting.
return new FinalizeAggregationWorkOrder(
query_id,
part_id,