You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/15 00:31:48 UTC

incubator-quickstep git commit: Added the support for partitioned (but not parallel) aggregations.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 361a65fa6 -> e6ac59d5a


Added the support for partitioned (but not parallel) aggregations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e6ac59d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e6ac59d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e6ac59d5

Branch: refs/heads/master
Commit: e6ac59d5a966f23a4fad3d907be7cc9b4ba53820
Parents: 361a65f
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri May 26 02:08:57 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 19:00:28 2017 -0500

----------------------------------------------------------------------
 query_execution/BlockLocator.hpp                |  2 +
 query_execution/ForemanDistributed.cpp          | 14 +++-
 query_execution/PolicyEnforcerDistributed.cpp   |  2 +
 query_execution/PolicyEnforcerDistributed.hpp   |  2 +
 query_execution/QueryContext.cpp                | 24 +++++--
 query_execution/QueryContext.hpp                | 26 +++++---
 query_execution/QueryContext.proto              |  7 +-
 query_execution/QueryManagerDistributed.cpp     |  5 +-
 query_execution/QueryManagerDistributed.hpp     | 16 +++--
 query_optimizer/ExecutionGenerator.cpp          | 57 ++++++++++++----
 relational_operators/AggregationOperator.cpp    | 62 ++++++++++-------
 relational_operators/AggregationOperator.hpp    | 38 ++++++++---
 .../BuildAggregationExistenceMapOperator.cpp    | 64 +++++++++++-------
 .../BuildAggregationExistenceMapOperator.hpp    | 37 ++++++++---
 relational_operators/CMakeLists.txt             |  4 ++
 .../DestroyAggregationStateOperator.cpp         | 24 ++++---
 .../DestroyAggregationStateOperator.hpp         | 12 +++-
 .../FinalizeAggregationOperator.cpp             | 58 ++++++++++------
 .../FinalizeAggregationOperator.hpp             | 18 +++--
 .../InitializeAggregationOperator.cpp           | 44 ++++++++----
 .../InitializeAggregationOperator.hpp           | 15 +++--
 relational_operators/WorkOrder.proto            | 19 +++++-
 relational_operators/WorkOrderFactory.cpp       | 70 ++++++++++++++++----
 .../tests/AggregationOperator_unittest.cpp      | 17 +++--
 24 files changed, 457 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index 82c28ae..01492f9 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -123,6 +123,8 @@ class BlockLocator : public Thread {
    * @return Whether the block locality info has found.
    **/
   bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const {
+    if (block == kInvalidBlockId) { return false; }
+
     const std::unordered_set<block_id_domain> block_domains = getBlockDomains(block);
     if (!block_domains.empty()) {
       // NOTE(zuyu): This lock is held for the rest duration of this call, as the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 34b5b76..49f2101 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,12 +243,14 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
                                                        size_t *shiftboss_index_for_aggregation) {
   const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::aggregation_state_id aggr_state_index;
+  partition_id part_id;
   vector<QueryContext::lip_filter_id> lip_filter_indexes;
   block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
     case S::AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+      part_id = work_order_proto.GetExtension(S::AggregationWorkOrder::partition_id);
 
       for (int i = 0; i < work_order_proto.ExtensionSize(S::AggregationWorkOrder::lip_filter_indexes); ++i) {
         lip_filter_indexes.push_back(work_order_proto.GetExtension(S::AggregationWorkOrder::lip_filter_indexes, i));
@@ -256,18 +258,28 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
 
       block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
       break;
+    case S::BUILD_AGGREGATION_EXISTENCE_MAP:
+      aggr_state_index = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::aggr_state_index);
+      part_id = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::partition_id);
+      break;
+    case S::INITIALIZE_AGGREGATION:
+      aggr_state_index = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::aggr_state_index);
+      part_id = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::partition_id);
+      break;
     case S::FINALIZE_AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
+      part_id = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::partition_id);
       break;
     case S::DESTROY_AGGREGATION_STATE:
       aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index);
+      part_id = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::partition_id);
       break;
     default:
       return false;
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
-      proto.query_id(), aggr_state_index, lip_filter_indexes, block_locator_, block,
+      proto.query_id(), aggr_state_index, part_id, lip_filter_indexes, block_locator_, block,
       next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
 
   return true;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 7e9a81d..766c351 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -192,6 +192,7 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
 void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
     const std::size_t query_id,
     const QueryContext::aggregation_state_id aggr_state_index,
+    const partition_id part_id,
     const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
     const BlockLocator &block_locator,
     const block_id block,
@@ -200,6 +201,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+                                                 part_id,
                                                  lip_filter_indexes,
                                                  block_locator,
                                                  block,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index e24f8cf..23b0017 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -133,6 +133,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    *
    * @param query_id The query id.
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param part_id The partition ID.
    * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
@@ -142,6 +143,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void getShiftbossIndexForAggregation(
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
+      const partition_id part_id,
       const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
       const BlockLocator &block_locator,
       const block_id block,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 8ba77ab..52aa4dc 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -64,10 +64,16 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
       << proto.DebugString();
 
   for (int i = 0; i < proto.aggregation_states_size(); ++i) {
-    aggregation_states_.emplace_back(
-        AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
-                                                        database,
-                                                        storage_manager));
+    PartitionedAggregationOperationStates partitioned_aggregation_states;
+    const serialization::QueryContext::AggregationOperationStateContext &aggr_state_context_proto =
+        proto.aggregation_states(i);
+    for (std::uint64_t j = 0; j < aggr_state_context_proto.num_partitions(); ++j) {
+      partitioned_aggregation_states.emplace_back(
+          AggregationOperationState::ReconstructFromProto(aggr_state_context_proto.aggregation_state(),
+                                                          database,
+                                                          storage_manager));
+    }
+    aggregation_states_.push_back(move(partitioned_aggregation_states));
   }
 
   for (int i = 0; i < proto.generator_functions_size(); ++i) {
@@ -167,7 +173,7 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
 bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
                                 const CatalogDatabaseLite &database) {
   for (int i = 0; i < proto.aggregation_states_size(); ++i) {
-    if (!AggregationOperationState::ProtoIsValid(proto.aggregation_states(i), database)) {
+    if (!AggregationOperationState::ProtoIsValid(proto.aggregation_states(i).aggregation_state(), database)) {
       return false;
     }
   }
@@ -293,8 +299,12 @@ std::size_t QueryContext::getAggregationStatesMemoryBytes() const {
   for (std::size_t agg_state_id = 0;
        agg_state_id < aggregation_states_.size();
        ++agg_state_id) {
-    if (aggregation_states_[agg_state_id] != nullptr) {
-      memory += aggregation_states_[agg_state_id]->getMemoryConsumptionBytes();
+    for (std::size_t part_id = 0;
+         part_id < aggregation_states_[agg_state_id].size();
+         ++part_id) {
+      if (aggregation_states_[agg_state_id][part_id] != nullptr) {
+        memory += aggregation_states_[agg_state_id][part_id]->getMemoryConsumptionBytes();
+      }
     }
   }
   return memory;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index ebc9506..7876821 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -169,38 +169,44 @@ class QueryContext {
    * @brief Whether the given AggregationOperationState id is valid.
    *
    * @param id The AggregationOperationState id.
+   * @param part_id The partition id.
    *
    * @return True if valid, otherwise false.
    **/
-  bool isValidAggregationStateId(const aggregation_state_id id) const {
+  bool isValidAggregationStateId(const aggregation_state_id id, const partition_id part_id) const {
     SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
-    return id < aggregation_states_.size();
+    return id < aggregation_states_.size() &&
+           part_id < aggregation_states_[id].size();
   }
 
   /**
    * @brief Get the AggregationOperationState.
    *
    * @param id The AggregationOperationState id in the query.
+   * @param part_id The partition id.
    *
    * @return The AggregationOperationState, alreadly created in the constructor.
    **/
-  inline AggregationOperationState* getAggregationState(const aggregation_state_id id) {
+  inline AggregationOperationState* getAggregationState(const aggregation_state_id id, const partition_id part_id) {
     SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
     DCHECK_LT(id, aggregation_states_.size());
-    DCHECK(aggregation_states_[id]);
-    return aggregation_states_[id].get();
+    DCHECK_LT(part_id, aggregation_states_[id].size());
+    DCHECK(aggregation_states_[id][part_id]);
+    return aggregation_states_[id][part_id].get();
   }
 
   /**
    * @brief Destroy the given aggregation state.
    *
    * @param id The ID of the AggregationOperationState to destroy.
+   * @param part_id The partition id.
    **/
-  inline void destroyAggregationState(const aggregation_state_id id) {
+  inline void destroyAggregationState(const aggregation_state_id id, const partition_id part_id) {
     SpinSharedMutexExclusiveLock<false> lock(aggregation_states_mutex_);
     DCHECK_LT(id, aggregation_states_.size());
-    DCHECK(aggregation_states_[id]);
-    aggregation_states_[id].reset(nullptr);
+    DCHECK_LT(part_id, aggregation_states_[id].size());
+    DCHECK(aggregation_states_[id][part_id]);
+    aggregation_states_[id][part_id].reset(nullptr);
   }
 
   /**
@@ -611,10 +617,12 @@ class QueryContext {
            part_id < join_hash_tables_[id].size();
   }
 
+  // Per AggregationOperationState, the index is the partition id.
+  typedef std::vector<std::unique_ptr<AggregationOperationState>> PartitionedAggregationOperationStates;
   // Per hash join, the index is the partition id.
   typedef std::vector<std::unique_ptr<JoinHashTable>> PartitionedJoinHashTables;
 
-  std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
+  std::vector<PartitionedAggregationOperationStates> aggregation_states_;
   std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
   std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
   std::vector<PartitionedJoinHashTables> join_hash_tables_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index 599daa7..b76374c 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -30,6 +30,11 @@ import "utility/SortConfiguration.proto";
 import "utility/lip_filter/LIPFilter.proto";
 
 message QueryContext {
+  message AggregationOperationStateContext {
+    required AggregationOperationState aggregation_state = 1;
+    optional uint64 num_partitions = 2 [default = 1];
+  }
+
   message HashTableContext {
     required HashTable join_hash_table = 1;
     optional uint64 num_partitions = 2 [default = 1];
@@ -50,7 +55,7 @@ message QueryContext {
     repeated UpdateAssignment update_assignments = 2;
   }
 
-  repeated AggregationOperationState aggregation_states = 1;
+  repeated AggregationOperationStateContext aggregation_states = 1;
   repeated GeneratorFunctionHandle generator_functions = 2;
   repeated HashTableContext join_hash_tables = 3;
   repeated InsertDestination insert_destinations = 4;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 6c293a5..77a605e 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -76,7 +76,10 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
   }
 
   const serialization::QueryContext &query_context_proto = query_handle->getQueryContextProto();
-  shiftboss_indexes_for_aggrs_.resize(query_context_proto.aggregation_states_size(), kInvalidShiftbossIndex);
+  for (int i = 0; i < query_context_proto.aggregation_states_size(); ++i) {
+    shiftboss_indexes_for_aggrs_.push_back(
+        vector<size_t>(query_context_proto.aggregation_states(i).num_partitions(), kInvalidShiftbossIndex));
+  }
 
   for (int i = 0; i < query_context_proto.join_hash_tables_size(); ++i) {
     shiftboss_indexes_for_hash_joins_.push_back(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 6d454cc..6490eb7 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -105,6 +105,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
    * <next_shiftboss_index_to_schedule>.
    *
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param part_id The partition ID.
    * @param lip_filter_indexes The LIP filter indexes.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
@@ -112,21 +113,24 @@ class QueryManagerDistributed final : public QueryManagerBase {
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+                                       const partition_id part_id,
                                        const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
                                        const BlockLocator &block_locator,
                                        const block_id block,
                                        const std::size_t next_shiftboss_index_to_schedule,
                                        std::size_t *shiftboss_index) {
     DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size());
-    if (shiftboss_indexes_for_aggrs_[aggr_state_index] != kInvalidShiftbossIndex) {
-      *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index];
+    DCHECK_LT(part_id, shiftboss_indexes_for_aggrs_[aggr_state_index].size());
+
+    if (shiftboss_indexes_for_aggrs_[aggr_state_index][part_id] != kInvalidShiftbossIndex) {
+      *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index][part_id];
       return;
     }
 
     getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
                             shiftboss_index);
 
-    shiftboss_indexes_for_aggrs_[aggr_state_index] = *shiftboss_index;
+    shiftboss_indexes_for_aggrs_[aggr_state_index][part_id] = *shiftboss_index;
   }
 
   /**
@@ -259,9 +263,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
 
-  // From an aggregation id (QueryContext::aggregation_state_id) to its
-  // scheduled Shiftboss index.
-  std::vector<std::size_t> shiftboss_indexes_for_aggrs_;
+  // Get the scheduled Shiftboss index given
+  // [QueryContext::aggregation_state_id][partition_id].
+  std::vector<std::vector<std::size_t>> shiftboss_indexes_for_aggrs_;
 
   // Get the scheduled Shiftboss index given
   // [QueryContext::join_hash_table_id][partition_id].

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2dbcf16..3b2fe08 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1580,14 +1580,25 @@ void ExecutionGenerator::convertUpdateTable(
 
 void ExecutionGenerator::convertAggregate(
     const P::AggregatePtr &physical_plan) {
+  const CatalogRelationInfo *input_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->input());
+  const CatalogRelation *input_relation = input_relation_info->relation;
+  const PartitionScheme *input_partition_scheme = input_relation->getPartitionScheme();
+  const size_t num_partitions =
+      input_partition_scheme
+          ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
+          : 1u;
+
   // Create aggr state proto.
   const QueryContext::aggregation_state_id aggr_state_index =
       query_context_proto_->aggregation_states_size();
-  S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+  S::QueryContext::AggregationOperationStateContext *aggr_state_context_proto =
+      query_context_proto_->add_aggregation_states();
+  aggr_state_context_proto->set_num_partitions(num_partitions);
 
-  const CatalogRelationInfo *input_relation_info =
-      findRelationInfoOutputByPhysical(physical_plan->input());
-  aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+  S::AggregationOperationState *aggr_state_proto =
+      aggr_state_context_proto->mutable_aggregation_state();
+  aggr_state_proto->set_relation_id(input_relation->getID());
 
   bool use_parallel_initialization = false;
 
@@ -1682,7 +1693,8 @@ void ExecutionGenerator::convertAggregate(
               query_handle_->query_id(),
               *input_relation_info->relation,
               input_relation_info->isStoredRelation(),
-              aggr_state_index));
+              aggr_state_index,
+              num_partitions));
 
   if (!input_relation_info->isStoredRelation()) {
     execution_plan_->addDirectDependency(aggregation_operator_index,
@@ -1695,7 +1707,8 @@ void ExecutionGenerator::convertAggregate(
         execution_plan_->addRelationalOperator(
             new InitializeAggregationOperator(
                 query_handle_->query_id(),
-                aggr_state_index));
+                aggr_state_index,
+                num_partitions));
 
     execution_plan_->addDirectDependency(aggregation_operator_index,
                                          initialize_aggregation_operator_index,
@@ -1715,6 +1728,7 @@ void ExecutionGenerator::convertAggregate(
       execution_plan_->addRelationalOperator(
           new FinalizeAggregationOperator(query_handle_->query_id(),
                                           aggr_state_index,
+                                          num_partitions,
                                           *output_relation,
                                           insert_destination_index));
 
@@ -1734,7 +1748,8 @@ void ExecutionGenerator::convertAggregate(
   const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
       execution_plan_->addRelationalOperator(
           new DestroyAggregationStateOperator(query_handle_->query_id(),
-                                              aggr_state_index));
+                                              aggr_state_index,
+                                              num_partitions));
 
   execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
                                        finalize_aggregation_operator_index,
@@ -1755,13 +1770,22 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
       findRelationInfoOutputByPhysical(physical_plan->left_child());
   const CatalogRelationInfo *right_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->right_child());
+  const CatalogRelation &right_relation = *right_relation_info->relation;
+
+  // TODO(quickstep-team): Support partitioned aggregation.
+  CHECK(!right_relation.hasPartitionScheme());
+  const std::size_t num_partitions = 1u;
 
   // Create aggr state proto.
   const QueryContext::aggregation_state_id aggr_state_index =
       query_context_proto_->aggregation_states_size();
-  S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+  S::QueryContext::AggregationOperationStateContext *aggr_state_context_proto =
+      query_context_proto_->add_aggregation_states();
+  aggr_state_context_proto->set_num_partitions(num_partitions);
 
-  aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+  S::AggregationOperationState *aggr_state_proto =
+      aggr_state_context_proto->mutable_aggregation_state();
+  aggr_state_proto->set_relation_id(right_relation.getID());
 
   // Group by the right join attribute.
   std::unique_ptr<const Scalar> execution_group_by_expression(
@@ -1807,7 +1831,8 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
       execution_plan_->addRelationalOperator(
           new InitializeAggregationOperator(
               query_handle_->query_id(),
-              aggr_state_index));
+              aggr_state_index,
+              num_partitions));
 
   const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
       execution_plan_->addRelationalOperator(
@@ -1816,7 +1841,8 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
               *left_relation_info->relation,
               physical_plan->left_join_attributes().front()->id(),
               left_relation_info->isStoredRelation(),
-              aggr_state_index));
+              aggr_state_index,
+              num_partitions));
 
   if (!left_relation_info->isStoredRelation()) {
     execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
@@ -1828,9 +1854,10 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
       execution_plan_->addRelationalOperator(
           new AggregationOperator(
               query_handle_->query_id(),
-              *right_relation_info->relation,
+              right_relation,
               right_relation_info->isStoredRelation(),
-              aggr_state_index));
+              aggr_state_index,
+              num_partitions));
 
   if (!right_relation_info->isStoredRelation()) {
     execution_plan_->addDirectDependency(aggregation_operator_index,
@@ -1862,6 +1889,7 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
       execution_plan_->addRelationalOperator(
           new FinalizeAggregationOperator(query_handle_->query_id(),
                                           aggr_state_index,
+                                          num_partitions,
                                           *output_relation,
                                           insert_destination_index));
 
@@ -1881,7 +1909,8 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
   const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
       execution_plan_->addRelationalOperator(
           new DestroyAggregationStateOperator(query_handle_->query_id(),
-                                              aggr_state_index));
+                                              aggr_state_index,
+                                              num_partitions));
 
   execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
                                        finalize_aggregation_operator_index,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index c774719..2618e01 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -21,6 +21,7 @@
 
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -41,29 +42,35 @@ bool AggregationOperator::getAllWorkOrders(
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
   if (input_relation_is_stored_) {
-    if (!started_) {
-      for (const block_id input_block_id : input_relation_block_ids_) {
+    if (started_) {
+      return true;
+    }
+
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
         container->addNormalWorkOrder(
             new AggregationWorkOrder(
                 query_id_,
                 input_block_id,
-                query_context->getAggregationState(aggr_state_index_),
+                query_context->getAggregationState(aggr_state_index_, part_id),
                 CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
             op_index_);
       }
-      started_ = true;
     }
-    return started_;
+    started_ = true;
+    return true;
   } else {
-    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
-      container->addNormalWorkOrder(
-          new AggregationWorkOrder(
-              query_id_,
-              input_relation_block_ids_[num_workorders_generated_],
-              query_context->getAggregationState(aggr_state_index_),
-              CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
-          op_index_);
-      ++num_workorders_generated_;
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+        container->addNormalWorkOrder(
+            new AggregationWorkOrder(
+                query_id_,
+                input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+                query_context->getAggregationState(aggr_state_index_, part_id),
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+            op_index_);
+        ++num_workorders_generated_[part_id];
+      }
     }
     return done_feeding_input_relation_;
   }
@@ -71,31 +78,38 @@ bool AggregationOperator::getAllWorkOrders(
 
 bool AggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (input_relation_is_stored_) {
-    if (!started_) {
-      for (const block_id input_block_id : input_relation_block_ids_) {
-        container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+    if (started_) {
+      return true;
+    }
+
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
+        container->addWorkOrderProto(createWorkOrderProto(input_block_id, part_id), op_index_);
       }
-      started_ = true;
     }
+    started_ = true;
     return true;
   } else {
-    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
-      container->addWorkOrderProto(
-          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
-          op_index_);
-      ++num_workorders_generated_;
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+        container->addWorkOrderProto(
+            createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]], part_id),
+            op_index_);
+        ++num_workorders_generated_[part_id];
+      }
     }
     return done_feeding_input_relation_;
   }
 }
 
-serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_id block, const partition_id part_id) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::AGGREGATION);
   proto->set_query_id(query_id_);
 
   proto->SetExtension(serialization::AggregationWorkOrder::block_id, block);
   proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
+  proto->SetExtension(serialization::AggregationWorkOrder::partition_id, part_id);
   proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
 
   for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 93f4550..1d37e50 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -20,11 +20,13 @@
 #ifndef QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATOR_HPP_
 #define QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATOR_HPP_
 
+#include <cstddef>
 #include <string>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -66,19 +68,34 @@ class AggregationOperator : public RelationalOperator {
    *        is fully available to the operator before it can start generating
    *        workorders.
    * @param aggr_state_index The index of the AggregationState in QueryContext.
+   * @param num_partitions The number of partitions in 'input_relation'. If no
+   *        partitions, it is one.
    **/
   AggregationOperator(const std::size_t query_id,
                       const CatalogRelation &input_relation,
                       bool input_relation_is_stored,
-                      const QueryContext::aggregation_state_id aggr_state_index)
+                      const QueryContext::aggregation_state_id aggr_state_index,
+                      const std::size_t num_partitions)
       : RelationalOperator(query_id),
         input_relation_(input_relation),
         input_relation_is_stored_(input_relation_is_stored),
-        input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
-                                                           : std::vector<block_id>()),
         aggr_state_index_(aggr_state_index),
-        num_workorders_generated_(0),
-        started_(false) {}
+        num_partitions_(num_partitions),
+        input_relation_block_ids_(num_partitions),
+        num_workorders_generated_(num_partitions),
+        started_(false) {
+    if (input_relation_is_stored) {
+      if (input_relation.hasPartitionScheme()) {
+        const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
+        for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+          input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+        }
+      } else {
+        // No partition.
+        input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
+      }
+    }
+  }
 
   ~AggregationOperator() override {}
 
@@ -104,7 +121,7 @@ class AggregationOperator : public RelationalOperator {
 
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                       const partition_id part_id) override {
-    input_relation_block_ids_.push_back(input_block_id);
+    input_relation_block_ids_[part_id].push_back(input_block_id);
   }
 
  private:
@@ -112,15 +129,18 @@ class AggregationOperator : public RelationalOperator {
    * @brief Create Work Order proto.
    *
    * @param block The block id used in the Work Order.
+   * @param part_id The partition id of 'block'.
    **/
-  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+  serialization::WorkOrder* createWorkOrderProto(const block_id block, const partition_id part_id);
 
   const CatalogRelation &input_relation_;
   const bool input_relation_is_stored_;
-  std::vector<block_id> input_relation_block_ids_;
   const QueryContext::aggregation_state_id aggr_state_index_;
+  const std::size_t num_partitions_;
 
-  std::vector<block_id>::size_type num_workorders_generated_;
+  // The index is the partition id.
+  std::vector<BlocksInPartition> input_relation_block_ids_;
+  std::vector<std::size_t> num_workorders_generated_;
   bool started_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperator);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
index ff65265..5552b75 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.cpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -88,33 +88,39 @@ bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
   if (input_relation_is_stored_) {
-    if (!started_) {
-      for (const block_id input_block_id : input_relation_block_ids_) {
+    if (started_) {
+      return true;
+    }
+
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
         container->addNormalWorkOrder(
             new BuildAggregationExistenceMapWorkOrder(
                 query_id_,
                 input_relation_,
                 input_block_id,
                 build_attribute_,
-                query_context->getAggregationState(aggr_state_index_),
+                query_context->getAggregationState(aggr_state_index_, part_id),
                 storage_manager),
             op_index_);
       }
-      started_ = true;
     }
+    started_ = true;
     return true;
   } else {
-    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
-      container->addNormalWorkOrder(
-          new BuildAggregationExistenceMapWorkOrder(
-                query_id_,
-                input_relation_,
-                input_relation_block_ids_[num_workorders_generated_],
-                build_attribute_,
-                query_context->getAggregationState(aggr_state_index_),
-                storage_manager),
-          op_index_);
-      ++num_workorders_generated_;
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+        container->addNormalWorkOrder(
+            new BuildAggregationExistenceMapWorkOrder(
+                  query_id_,
+                  input_relation_,
+                  input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+                  build_attribute_,
+                  query_context->getAggregationState(aggr_state_index_, part_id),
+                  storage_manager),
+            op_index_);
+        ++num_workorders_generated_[part_id];
+      }
     }
     return done_feeding_input_relation_;
   }
@@ -123,26 +129,32 @@ bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
 bool BuildAggregationExistenceMapOperator
     ::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (input_relation_is_stored_) {
-    if (!started_) {
-      for (const block_id block : input_relation_block_ids_) {
-        container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+    if (started_) {
+      return true;
+    }
+
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id block : input_relation_block_ids_[part_id]) {
+        container->addWorkOrderProto(createWorkOrderProto(block, part_id), op_index_);
       }
-      started_ = true;
     }
+    started_ = true;
     return true;
   } else {
-    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
-      container->addWorkOrderProto(
-          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
-          op_index_);
-      ++num_workorders_generated_;
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
+        container->addWorkOrderProto(
+            createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]], part_id),
+            op_index_);
+        ++num_workorders_generated_[part_id];
+      }
     }
     return done_feeding_input_relation_;
   }
 }
 
 serialization::WorkOrder* BuildAggregationExistenceMapOperator
-    ::createWorkOrderProto(const block_id block) {
+    ::createWorkOrderProto(const block_id block, const partition_id part_id) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::BUILD_AGGREGATION_EXISTENCE_MAP);
   proto->set_query_id(query_id_);
@@ -155,6 +167,8 @@ serialization::WorkOrder* BuildAggregationExistenceMapOperator
                       build_attribute_);
   proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index,
                       aggr_state_index_);
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id,
+                      part_id);
   return proto;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
index b28b0b4..b29ad4a 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.hpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -21,12 +21,12 @@
 #define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
 
 #include <cstddef>
-
 #include <string>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -70,21 +70,36 @@ class BuildAggregationExistenceMapOperator : public RelationalOperator {
    *        is fully available to the operator before it can start generating
    *        workorders.
    * @param aggr_state_index The index of the AggregationState in QueryContext.
+   * @param num_partitions The number of partitions in 'input_relation'. If no
+   *        partitions, it is one.
    **/
   BuildAggregationExistenceMapOperator(const std::size_t query_id,
                                        const CatalogRelation &input_relation,
                                        const attribute_id build_attribute,
                                        const bool input_relation_is_stored,
-                                       const QueryContext::aggregation_state_id aggr_state_index)
+                                       const QueryContext::aggregation_state_id aggr_state_index,
+                                       const std::size_t num_partitions)
       : RelationalOperator(query_id),
         input_relation_(input_relation),
         build_attribute_(build_attribute),
         input_relation_is_stored_(input_relation_is_stored),
         aggr_state_index_(aggr_state_index),
-        input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
-                                                           : std::vector<block_id>()),
-        num_workorders_generated_(0),
-        started_(false) {}
+        num_partitions_(num_partitions),
+        input_relation_block_ids_(num_partitions),
+        num_workorders_generated_(num_partitions),
+        started_(false) {
+    if (input_relation_is_stored) {
+      if (input_relation.hasPartitionScheme()) {
+        const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
+        for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+          input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+        }
+      } else {
+        // No partition.
+        input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
+      }
+    }
+  }
 
   ~BuildAggregationExistenceMapOperator() override {}
 
@@ -113,19 +128,21 @@ class BuildAggregationExistenceMapOperator : public RelationalOperator {
 
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                       const partition_id part_id) override {
-    input_relation_block_ids_.push_back(input_block_id);
+    input_relation_block_ids_[part_id].push_back(input_block_id);
   }
 
  private:
-  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+  serialization::WorkOrder* createWorkOrderProto(const block_id block, const partition_id part_id);
 
   const CatalogRelation &input_relation_;
   const attribute_id build_attribute_;
   const bool input_relation_is_stored_;
   const QueryContext::aggregation_state_id aggr_state_index_;
+  const std::size_t num_partitions_;
 
-  std::vector<block_id> input_relation_block_ids_;
-  std::vector<block_id>::size_type num_workorders_generated_;
+  // The index is the partition id.
+  std::vector<BlocksInPartition> input_relation_block_ids_;
+  std::vector<std::size_t> num_workorders_generated_;
   bool started_;
 
   DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 5b3f009..5ad9c3b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -87,6 +87,7 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
                       glog
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer
@@ -105,6 +106,7 @@ target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMap
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer
@@ -205,6 +207,7 @@ target_link_libraries(quickstep_relationaloperators_DeleteOperator
                       tmb)
 target_link_libraries(quickstep_relationaloperators_DestroyAggregationStateOperator
                       glog
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer
@@ -581,6 +584,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
+                      quickstep_relationaloperators_InitializeAggregationOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_SampleOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 62ca9e7..013bf18 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -19,6 +19,7 @@
 
 #include "relational_operators/DestroyAggregationStateOperator.hpp"
 
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -36,9 +37,11 @@ bool DestroyAggregationStateOperator::getAllWorkOrders(
     tmb::MessageBus *bus) {
   if (blocking_dependencies_met_ && !work_generated_) {
     work_generated_ = true;
-    container->addNormalWorkOrder(
-        new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, query_context),
-        op_index_);
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      container->addNormalWorkOrder(
+          new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, part_id, query_context),
+          op_index_);
+    }
   }
   return work_generated_;
 }
@@ -47,18 +50,21 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
   if (blocking_dependencies_met_ && !work_generated_) {
     work_generated_ = true;
 
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
-    proto->set_query_id(query_id_);
-    proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
+      proto->set_query_id(query_id_);
+      proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
+      proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id, part_id);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
+    }
   }
   return work_generated_;
 }
 
 void DestroyAggregationStateWorkOrder::execute() {
-  query_context_->destroyAggregationState(aggr_state_index_);
+  query_context_->destroyAggregationState(aggr_state_index_, part_id_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/DestroyAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp
index 70ab45c..990160f 100644
--- a/relational_operators/DestroyAggregationStateOperator.hpp
+++ b/relational_operators/DestroyAggregationStateOperator.hpp
@@ -22,6 +22,7 @@
 
 #include <string>
 
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -53,12 +54,16 @@ class DestroyAggregationStateOperator : public RelationalOperator {
    *
    * @param query_id The ID of the query to which this operator belongs.
    * @param aggr_state_index The index of the AggregationState in QueryContext.
+   * @param num_partitions The number of partitions of 'input_relation' in a
+   *        partitioned aggregation. If no partitions, it is one.
    **/
   DestroyAggregationStateOperator(
       const std::size_t query_id,
-      const QueryContext::aggregation_state_id aggr_state_index)
+      const QueryContext::aggregation_state_id aggr_state_index,
+      const std::size_t num_partitions)
       : RelationalOperator(query_id),
         aggr_state_index_(aggr_state_index),
+        num_partitions_(num_partitions),
         work_generated_(false) {}
 
   ~DestroyAggregationStateOperator() override {}
@@ -81,6 +86,7 @@ class DestroyAggregationStateOperator : public RelationalOperator {
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
+  const std::size_t num_partitions_;
   bool work_generated_;
 
   DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator);
@@ -96,14 +102,17 @@ class DestroyAggregationStateWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param aggr_state_index The index of the AggregationState in QueryContext.
+   * @param part_id The partition id.
    * @param query_context The QueryContext to use.
    **/
   DestroyAggregationStateWorkOrder(
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
+      const partition_id part_id,
       QueryContext *query_context)
       : WorkOrder(query_id),
         aggr_state_index_(aggr_state_index),
+        part_id_(part_id),
         query_context_(DCHECK_NOTNULL(query_context)) {}
 
   ~DestroyAggregationStateWorkOrder() override {}
@@ -112,6 +121,7 @@ class DestroyAggregationStateWorkOrder : public WorkOrder {
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
+  const partition_id part_id_;
   QueryContext *query_context_;
 
   DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 77b4b97..14db825 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -19,6 +19,9 @@
 
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 
+#include <cstddef>
+
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -41,19 +44,23 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
-    AggregationOperationState *agg_state =
-        query_context->getAggregationState(aggr_state_index_);
-    DCHECK(agg_state != nullptr);
-    for (std::size_t part_id = 0;
-         part_id < agg_state->getNumFinalizationPartitions();
-         ++part_id) {
-      container->addNormalWorkOrder(
-          new FinalizeAggregationWorkOrder(
-              query_id_,
-              part_id,
-              agg_state,
-              query_context->getInsertDestination(output_destination_index_)),
-          op_index_);
+
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      AggregationOperationState *agg_state =
+          query_context->getAggregationState(aggr_state_index_, part_id);
+      DCHECK(agg_state != nullptr);
+      for (std::size_t state_part_id = 0;
+           state_part_id < agg_state->getNumFinalizationPartitions();
+           ++state_part_id) {
+        container->addNormalWorkOrder(
+            new FinalizeAggregationWorkOrder(
+                query_id_,
+                part_id,
+                state_part_id,
+                agg_state,
+                query_context->getInsertDestination(output_destination_index_)),
+            op_index_);
+      }
     }
   }
   return started_;
@@ -66,21 +73,28 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
 
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
-    proto->set_query_id(query_id_);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
-                        aggr_state_index_);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
-                        output_destination_index_);
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+      proto->set_query_id(query_id_);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+                          aggr_state_index_);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+                          part_id);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+                          0u);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+                          output_destination_index_);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
+    }
   }
   return started_;
 }
 
 void FinalizeAggregationWorkOrder::execute() {
-  state_->finalizeAggregate(partition_id_, output_destination_);
+  (void) part_id_;
+  state_->finalizeAggregate(state_partition_id_, output_destination_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 87533af..5210de2 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -59,6 +59,8 @@ class FinalizeAggregationOperator : public RelationalOperator {
    *
    * @param query_id The ID of the query to which this operator belongs.
    * @param aggr_state_index The index of the AggregationState in QueryContext.
+   * @param num_partitions The number of partitions of 'input_relation' in a
+   *        partitioned aggregation. If no partitions, it is one.
    * @param output_relation The output relation.
    * @param output_destination_index The index of the InsertDestination in the
    *        QueryContext to insert aggregation results.
@@ -66,10 +68,12 @@ class FinalizeAggregationOperator : public RelationalOperator {
   FinalizeAggregationOperator(
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
+      const std::size_t num_partitions,
       const CatalogRelation &output_relation,
       const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
         aggr_state_index_(aggr_state_index),
+        num_partitions_(num_partitions),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         started_(false) {}
@@ -102,6 +106,7 @@ class FinalizeAggregationOperator : public RelationalOperator {
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
+  const std::size_t num_partitions_;
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   bool started_;
@@ -120,18 +125,21 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @note InsertWorkOrder takes ownership of \c state.
    *
    * @param query_id The ID of the query to which this operator belongs.
-   * @param partition_id The partition ID for which the Finalize aggregation
-   *        work order is issued.
+   * @param part_id The partition ID used by 'output_destination'.
+   * @param state_partition_id The partition ID for which the Finalize
+   *        aggregation work order is issued.
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
-                               const std::size_t partition_id,
+                               const std::size_t part_id,
+                               const std::size_t state_partition_id,
                                AggregationOperationState *state,
                                InsertDestination *output_destination)
       : WorkOrder(query_id),
-        partition_id_(partition_id),
+        part_id_(part_id),
+        state_partition_id_(state_partition_id),
         state_(DCHECK_NOTNULL(state)),
         output_destination_(DCHECK_NOTNULL(output_destination)) {}
 
@@ -140,7 +148,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
-  const std::size_t partition_id_;
+  const std::size_t part_id_, state_partition_id_;
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index b1063ad..e197b08 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -40,18 +40,20 @@ bool InitializeAggregationOperator::getAllWorkOrders(
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
   if (!started_) {
-    AggregationOperationState *agg_state =
-        query_context->getAggregationState(aggr_state_index_);
-    DCHECK(agg_state != nullptr);
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      AggregationOperationState *agg_state =
+          query_context->getAggregationState(aggr_state_index_, part_id);
+      DCHECK(agg_state != nullptr);
 
-    for (std::size_t part_id = 0;
-         part_id < agg_state->getNumInitializationPartitions();
-         ++part_id) {
-      container->addNormalWorkOrder(
-          new InitializeAggregationWorkOrder(query_id_,
-                                             part_id,
-                                             agg_state),
-          op_index_);
+      for (std::size_t state_part_id = 0;
+           state_part_id < agg_state->getNumInitializationPartitions();
+           ++state_part_id) {
+        container->addNormalWorkOrder(
+            new InitializeAggregationWorkOrder(query_id_,
+                                               state_part_id,
+                                               agg_state),
+            op_index_);
+      }
     }
     started_ = true;
   }
@@ -63,10 +65,28 @@ bool InitializeAggregationOperator::getAllWorkOrders(
 // initialization with the distributed version.
 bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   LOG(FATAL) << "Not supported";
+
+  if (started_) {
+    return true;
+  }
+
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
+    proto->set_query_id(query_id_);
+
+    proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
+    proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
+    proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, 0u);
+
+    container->addWorkOrderProto(proto, op_index_);
+  }
+  started_ = true;
+  return true;
 }
 
 void InitializeAggregationWorkOrder::execute() {
-  state_->initialize(partition_id_);
+  state_->initialize(state_partition_id_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/InitializeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index e81264a..0a9d25d 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -56,11 +56,15 @@ class InitializeAggregationOperator : public RelationalOperator {
    *
    * @param query_id The ID of this query.
    * @param aggr_state_index The index of the AggregationOperationState in QueryContext.
+   * @param num_partitions The number of partitions in 'input_relation'. If no
+   *        partitions, it is one.
    **/
   InitializeAggregationOperator(const std::size_t query_id,
-                                const QueryContext::aggregation_state_id aggr_state_index)
+                                const QueryContext::aggregation_state_id aggr_state_index,
+                                const std::size_t num_partitions)
       : RelationalOperator(query_id),
         aggr_state_index_(aggr_state_index),
+        num_partitions_(num_partitions),
         started_(false) {}
 
   ~InitializeAggregationOperator() override {}
@@ -83,6 +87,7 @@ class InitializeAggregationOperator : public RelationalOperator {
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
+  const std::size_t num_partitions_;
   bool started_;
 
   DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
@@ -97,14 +102,14 @@ class InitializeAggregationWorkOrder : public WorkOrder {
    * @brief Constructor.
    *
    * @param query_id The ID of the query to which this operator belongs.
-   * @param partition_id The partition ID for which the work order is issued.
+   * @param state_partition_id The partition ID for which the work order is issued.
    * @param state The AggregationOperationState to be initialized.
    */
   InitializeAggregationWorkOrder(const std::size_t query_id,
-                                 const std::size_t partition_id,
+                                 const std::size_t state_partition_id,
                                  AggregationOperationState *state)
       : WorkOrder(query_id),
-        partition_id_(partition_id),
+        state_partition_id_(state_partition_id),
         state_(DCHECK_NOTNULL(state)) {}
 
   ~InitializeAggregationWorkOrder() override {}
@@ -112,7 +117,7 @@ class InitializeAggregationWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
-  const std::size_t partition_id_;
+  const std::size_t state_partition_id_;
 
   AggregationOperationState *state_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 99b4507..18f0589 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -21,7 +21,7 @@ package quickstep.serialization;
 
 import "relational_operators/SortMergeRunOperator.proto";
 
-// Next tag: 25.
+// Next tag: 26.
 enum WorkOrderType {
   AGGREGATION = 1;
   BUILD_AGGREGATION_EXISTENCE_MAP = 23;
@@ -35,6 +35,7 @@ enum WorkOrderType {
   DROP_TABLE = 8;
   FINALIZE_AGGREGATION = 9;
   HASH_JOIN = 10;
+  INITIALIZE_AGGREGATION = 25;
   INSERT = 11;
   NESTED_LOOP_JOIN = 12;
   SAMPLE = 13;
@@ -58,10 +59,12 @@ message WorkOrder {
   extensions 16 to max;
 }
 
+// Next tag: 21.
 message AggregationWorkOrder {
   extend WorkOrder {
     // All required.
     optional uint32 aggr_state_index = 16;
+    optional uint64 partition_id = 20;
     optional fixed64 block_id = 17;
     optional int32 lip_deployment_index = 18;
     repeated uint32 lip_filter_indexes = 19;
@@ -74,6 +77,7 @@ message BuildAggregationExistenceMapWorkOrder {
     optional fixed64 build_block_id = 369;
     optional int32 build_attribute = 370;
     optional uint32 aggr_state_index = 371;
+    optional uint64 partition_id = 372;
   }
 }
 
@@ -118,6 +122,7 @@ message DeleteWorkOrder {
 message DestroyAggregationStateWorkOrder {
   extend WorkOrder {
     optional uint32 aggr_state_index = 352;
+    optional uint64 partition_id = 353;
   }
 }
 
@@ -138,10 +143,13 @@ message DropTableWorkOrder {
   }
 }
 
+// Next tag: 148.
 message FinalizeAggregationWorkOrder {
   extend WorkOrder {
     // All required.
     optional uint32 aggr_state_index = 144;
+    optional uint64 partition_id = 146;
+    optional uint64 state_partition_id = 147;
     optional int32 insert_destination_index = 145;
   }
 }
@@ -178,6 +186,15 @@ message HashJoinWorkOrder {
   }
 }
 
+message InitializeAggregationWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint32 aggr_state_index = 400;
+    optional uint64 partition_id = 401;
+    optional uint64 state_partition_id = 402;
+  }
+}
+
 message InsertWorkOrder {
   extend WorkOrder {
     // All required.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 4c1ffa9..48bf956 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -38,6 +38,7 @@
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 #include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationOperator.hpp"
 #include "relational_operators/InsertOperator.hpp"
 #include "relational_operators/NestedLoopsJoinOperator.hpp"
 #include "relational_operators/SampleOperator.hpp"
@@ -88,18 +89,24 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
 
   switch (proto.work_order_type()) {
     case serialization::AGGREGATION: {
-      LOG(INFO) << "Creating AggregationWorkOrder for Query " << query_id
+      const partition_id part_id =
+          proto.GetExtension(serialization::AggregationWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating AggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new AggregationWorkOrder(
           query_id,
           proto.GetExtension(serialization::AggregationWorkOrder::block_id),
           query_context->getAggregationState(
-              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
+              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index), part_id),
           CreateLIPFilterAdaptiveProberHelper(
               proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
-      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " << query_id
+      const partition_id part_id =
+          proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
 
       return new BuildAggregationExistenceMapWorkOrder(
@@ -109,7 +116,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
           query_context->getAggregationState(
-              proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index)),
+              proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index), part_id),
           storage_manager);
     }
     case serialization::BUILD_LIP_FILTER: {
@@ -171,12 +178,16 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           bus);
     }
     case serialization::DESTROY_AGGREGATION_STATE: {
-      LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " << query_id
+      const partition_id part_id =
+          proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating DestroyAggregationStateWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new DestroyAggregationStateWorkOrder(
           query_id,
           proto.GetExtension(
               serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
+          part_id,
           query_context);
     }
     case serialization::DESTROY_HASH: {
@@ -210,15 +221,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           catalog_database);
     }
     case serialization::FINALIZE_AGGREGATION: {
-      LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << query_id
+      const partition_id part_id =
+          proto.GetExtension(serialization::FinalizeAggregationWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating FinalizeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       // TODO(quickstep-team): Handle inner-table partitioning in the distributed
       // setting.
       return new FinalizeAggregationWorkOrder(
           query_id,
-          0uL /* partition_id */,
+          part_id,
+          proto.GetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id),
           query_context->getAggregationState(proto.GetExtension(
-              serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
+              serialization::FinalizeAggregationWorkOrder::aggr_state_index), part_id),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::FinalizeAggregationWorkOrder::
                                      insert_destination_index)));
@@ -354,6 +369,20 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto";
       }
     }
+    case serialization::INITIALIZE_AGGREGATION: {
+      const partition_id part_id =
+          proto.GetExtension(serialization::InitializeAggregationWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating InitializeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
+                << " in Shiftboss " << shiftboss_index;
+      AggregationOperationState *aggr_state =
+          query_context->getAggregationState(
+              proto.GetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index), part_id);
+      return new InitializeAggregationWorkOrder(query_id,
+                                                proto.GetExtension(
+                                                    serialization::InitializeAggregationWorkOrder::state_partition_id),
+                                                aggr_state);
+    }
     case serialization::INSERT: {
       LOG(INFO) << "Creating InsertWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       return new InsertWorkOrder(
@@ -578,8 +607,10 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
 
       return proto.HasExtension(serialization::AggregationWorkOrder::block_id) &&
              proto.HasExtension(serialization::AggregationWorkOrder::aggr_state_index) &&
+             proto.HasExtension(serialization::AggregationWorkOrder::partition_id) &&
              query_context.isValidAggregationStateId(
-                 proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index));
+                 proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index),
+                 proto.GetExtension(serialization::AggregationWorkOrder::partition_id));
     }
     case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
       if (!proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)) {
@@ -601,8 +632,10 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
 
       return proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id) &&
              proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index) &&
+             proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id) &&
              query_context.isValidAggregationStateId(
-                 proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index));
+                 proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index),
+                 proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::partition_id));
     }
     case serialization::BUILD_HASH: {
       if (!proto.HasExtension(serialization::BuildHashWorkOrder::relation_id)) {
@@ -680,8 +713,10 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
     }
     case serialization::DESTROY_AGGREGATION_STATE: {
       return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
+             proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::partition_id) &&
              query_context.isValidAggregationStateId(
-                 proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index));
+                 proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
+                 proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id));
     }
     case serialization::DESTROY_HASH: {
       return proto.HasExtension(serialization::DestroyHashWorkOrder::join_hash_table_index) &&
@@ -695,8 +730,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
     }
     case serialization::FINALIZE_AGGREGATION: {
       return proto.HasExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index) &&
+             proto.HasExtension(serialization::FinalizeAggregationWorkOrder::partition_id) &&
              query_context.isValidAggregationStateId(
-                 proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index)) &&
+                 proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index),
+                 proto.GetExtension(serialization::FinalizeAggregationWorkOrder::partition_id)) &&
+             proto.HasExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id) &&
              proto.HasExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index) &&
              query_context.isValidInsertDestinationId(
                  proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index));
@@ -775,6 +813,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
                  proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) &&
              proto.HasExtension(serialization::HashJoinWorkOrder::block_id);
     }
+    case serialization::INITIALIZE_AGGREGATION: {
+      return proto.HasExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index) &&
+             proto.HasExtension(serialization::InitializeAggregationWorkOrder::partition_id) &&
+             query_context.isValidAggregationStateId(
+                 proto.GetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index),
+                 proto.GetExtension(serialization::InitializeAggregationWorkOrder::partition_id)) &&
+             proto.HasExtension(serialization::InitializeAggregationWorkOrder::state_partition_id);
+    }
     case serialization::INSERT: {
       return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
              query_context.isValidInsertDestinationId(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6ac59d5/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 6881dea..0690b6b 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -82,6 +82,7 @@ namespace quickstep {
 namespace {
 constexpr std::size_t kQueryId = 0;
 constexpr int kOpIndex = 0;
+constexpr std::size_t kNumPartitions = 1u;
 }  // namespace
 
 class Type;
@@ -234,7 +235,8 @@ class AggregationOperatorTest : public ::testing::Test {
     query_context_proto.set_query_id(0);  // dummy query ID.
 
     const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
-    serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
+    serialization::AggregationOperationState *aggr_state_proto =
+        query_context_proto.add_aggregation_states()->mutable_aggregation_state();
     aggr_state_proto->set_relation_id(table_->getID());
 
     // Add an aggregate.
@@ -276,7 +278,7 @@ class AggregationOperatorTest : public ::testing::Test {
     aggr_state_proto->set_estimated_num_entries(estimated_entries);
 
     // Create Operators.
-    op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index));
+    op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index, kNumPartitions));
 
     // Setup the InsertDestination proto in the query context proto.
     const QueryContext::insert_destination_id insert_destination_index =
@@ -290,11 +292,12 @@ class AggregationOperatorTest : public ::testing::Test {
     finalize_op_.reset(
         new FinalizeAggregationOperator(kQueryId,
                                         aggr_state_index,
+                                        kNumPartitions,
                                         *result_table_,
                                         insert_destination_index));
 
     destroy_aggr_state_op_.reset(
-        new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+        new DestroyAggregationStateOperator(kQueryId, aggr_state_index, kNumPartitions));
 
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,
@@ -331,7 +334,8 @@ class AggregationOperatorTest : public ::testing::Test {
     query_context_proto.set_query_id(0);  // dummy query ID.
 
     const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
-    serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
+    serialization::AggregationOperationState *aggr_state_proto =
+        query_context_proto.add_aggregation_states()->mutable_aggregation_state();
     aggr_state_proto->set_relation_id(table_->getID());
 
     // Add an aggregate.
@@ -368,7 +372,7 @@ class AggregationOperatorTest : public ::testing::Test {
         serialization::HashTableImplType::SEPARATE_CHAINING);
 
     // Create Operators.
-    op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index));
+    op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index, kNumPartitions));
 
     // Setup the InsertDestination proto in the query context proto.
     const QueryContext::insert_destination_id insert_destination_index =
@@ -382,11 +386,12 @@ class AggregationOperatorTest : public ::testing::Test {
     finalize_op_.reset(
         new FinalizeAggregationOperator(kQueryId,
                                         aggr_state_index,
+                                        kNumPartitions,
                                         *result_table_,
                                         insert_destination_index));
 
     destroy_aggr_state_op_.reset(
-        new DestroyAggregationStateOperator(kQueryId, aggr_state_index));
+        new DestroyAggregationStateOperator(kQueryId, aggr_state_index, kNumPartitions));
 
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,