You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/21 19:23:39 UTC

incubator-quickstep git commit: Support for performing partitioned aggregation.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/partitioned-aggregate-new [created] f7183fcff


Support for performing partitioned aggregation.

- Used for creating a pool of hash tables such that each hash table
  belongs to a unique partition.
- The partitioning is done on the group-by keys.
- Wrote a utility function to compute composite hash of a group of
  TypedValues.

- Added a check for whether the aggregation is partitioned or not.

- The conditions for whether the aggregation can be partitioned
  are as follows:
  1. The query has a GROUP BY clause.
  2. There are no aggrgeations with a DISTINCT clause.
  3. The estimated number of groups are greater than a pre-defined
  threshold.

- Method for partitioned aggregation with GROUP BY

- StorageBlock now provides a method for performing GROUP BY aggregation
  in a partitioned way.
- The Tuple class now supports a method to compute the hash of the entire
  tuple (i.e. hash key is the composite key made up of all the
  attributes in the tuple).
- AggregationOperationState calls appropriate method (i.e.
  aggregateGroupBy or aggregateGroupByPartitioned) based on the way in
  which aggregation is being performed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f7183fcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f7183fcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f7183fcf

Branch: refs/heads/partitioned-aggregate-new
Commit: f7183fcfff7f7b660f7a3b761b9240e13a231c83
Parents: ac3512c
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Sep 21 11:43:39 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Sep 21 13:39:25 2016 -0500

----------------------------------------------------------------------
 storage/AggregationOperationState.cpp |  78 +++++++----
 storage/AggregationOperationState.hpp |  42 ++++++
 storage/CMakeLists.txt                |  12 ++
 storage/PartitionedHashTablePool.hpp  | 205 +++++++++++++++++++++++++++++
 storage/StorageBlock.cpp              | 115 ++++++++++++++++
 storage/StorageBlock.hpp              |  43 ++++++
 types/containers/CMakeLists.txt       |   1 +
 types/containers/Tuple.hpp            |   8 ++
 utility/CMakeLists.txt                |   6 +
 utility/CompositeHash.hpp             |  52 ++++++++
 10 files changed, 533 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 073b813..ac78b20 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -73,7 +73,9 @@ AggregationOperationState::AggregationOperationState(
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
       is_distinct_(std::move(is_distinct)),
-      storage_manager_(storage_manager) {
+      storage_manager_(storage_manager),
+      is_aggregate_partitioned_(checkAggregatePartitioned(
+          estimated_num_entries, is_distinct_, group_by)) {
   // Sanity checks: each aggregate has a corresponding list of arguments.
   DCHECK(aggregate_functions.size() == arguments_.size());
 
@@ -166,18 +168,16 @@ AggregationOperationState::AggregationOperationState(
       }
 
       // Initialize the corresponding distinctify hash table if this is a
-      // DISTINCT
-      // aggregation.
+      // DISTINCT aggregation.
       if (*is_distinct_it) {
         std::vector<const Type *> key_types(group_by_types);
         key_types.insert(
             key_types.end(), argument_types.begin(), argument_types.end());
         // TODO(jianqiao): estimated_num_entries is quite inaccurate for
-        // estimating
-        // the number of entries in the distinctify hash table. We may estimate
-        // for each distinct aggregation an estimated_num_distinct_keys value
-        // during
-        // query optimization, if it worths.
+        // estimating the number of entries in the distinctify hash table.
+        // We may estimate for each distinct aggregation an
+        // estimated_num_distinct_keys value during query optimization, if it's
+        // worth.
         distinctify_hashtables_.emplace_back(
             AggregationStateFastHashTableFactory::CreateResizable(
                 *distinctify_hash_table_impl_types_it,
@@ -193,14 +193,24 @@ AggregationOperationState::AggregationOperationState(
     }
 
     if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool for per-group
-      // states.
-      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                       hash_table_impl_type,
-                                                       group_by_types,
-                                                       payload_sizes,
-                                                       group_by_handles,
-                                                       storage_manager));
+      // Aggregation with GROUP BY: create a HashTable pool.
+      if (!is_aggregate_partitioned_) {
+        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                         hash_table_impl_type,
+                                                         group_by_types,
+                                                         payload_sizes,
+                                                         group_by_handles,
+                                                         storage_manager));
+      } else {
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         kNumPartitionsForAggregate,
+                                         hash_table_impl_type,
+                                         group_by_types,
+                                         payload_sizes,
+                                         group_by_handles,
+                                         storage_manager));
+      }
     }
   }
 }
@@ -439,20 +449,30 @@ void AggregationOperationState::aggregateBlockHashTable(
     }
   }
 
-  // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-  // directly into the (threadsafe) shared global HashTable for this
-  // aggregate.
-  DCHECK(group_by_hashtable_pool_ != nullptr);
-  AggregationStateHashTableBase *agg_hash_table =
+  if (!is_aggregate_partitioned_) {
+    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+    // directly into the (threadsafe) shared global HashTable for this
+    // aggregate.
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+    AggregationStateHashTableBase *agg_hash_table =
       group_by_hashtable_pool_->getHashTableFast();
-  DCHECK(agg_hash_table != nullptr);
-  block->aggregateGroupBy(arguments_,
-                          group_by_list_,
-                          predicate_.get(),
-                          agg_hash_table,
-                          &reuse_matches,
-                          &reuse_group_by_vectors);
-  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+    DCHECK(agg_hash_table != nullptr);
+    block->aggregateGroupBy(arguments_,
+        group_by_list_,
+        predicate_.get(),
+        agg_hash_table,
+        &reuse_matches,
+        &reuse_group_by_vectors);
+    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  } else {
+    block->aggregateGroupByPartitioned(
+        arguments_,
+        group_by_list_,
+        predicate_.get(),
+        &reuse_matches,
+        &reuse_group_by_vectors,
+        partitioned_group_by_hashtable_pool_.get());
+  }
 }
 
 void AggregationOperationState::finalizeSingleState(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index cbbfc22..cebb3e6 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -32,6 +32,7 @@
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
@@ -173,6 +174,9 @@ class AggregationOperationState {
   int dflag;
 
  private:
+  static constexpr std::size_t kPartitionedAggregateThreshold = 100;
+  static constexpr std::size_t kNumPartitionsForAggregate = 40;
+
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
   void mergeSingleState(
@@ -185,6 +189,40 @@ class AggregationOperationState {
   void finalizeSingleState(InsertDestination *output_destination);
   void finalizeHashTable(InsertDestination *output_destination);
 
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by) const {
+    // Check if there's a distinct operation involved in any aggregate, if so
+    // the aggregate can't be partitioned.
+    for (auto distinct : is_distinct) {
+      if (distinct) {
+        return false;
+      }
+    }
+    // There's no distinct aggregation involved, Check if there's at least one
+    // GROUP BY operation.
+    if (group_by.empty()) {
+      return false;
+    }
+    // There are GROUP BYs without DISTINCT. Check if the estimated number of
+    // groups is large enough to warrant a partitioned aggregation.
+    return estimated_num_groups > kPartitionedAggregateThreshold;
+  }
+
+  bool isAggregatePartitioned() const {
+    return is_aggregate_partitioned_;
+  }
+
+  /**
+   * @note This method is relevant only when the aggregate is partitioned.
+   **/
+  std::size_t getNumPartitions() const {
+    return is_aggregate_partitioned_
+               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
+               : 1;
+  }
+
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
@@ -224,8 +262,12 @@ class AggregationOperationState {
   // A vector of group by hash table pools.
   std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
 
+  std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+
   StorageManager *storage_manager_;
 
+  // Whether the aggregation is partitioned or not.
+  const bool is_aggregate_partitioned_;
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f05cc46..9c1ebd0 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -235,6 +235,7 @@ add_library(quickstep_storage_PackedRowStoreTupleStorageSubBlock
 add_library(quickstep_storage_PackedRowStoreValueAccessor
             ../empty_src.cpp
             PackedRowStoreValueAccessor.hpp)
+add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
 add_library(quickstep_storage_SeparateChainingHashTable ../empty_src.cpp SeparateChainingHashTable.hpp)
@@ -289,6 +290,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
@@ -850,6 +852,14 @@ target_link_libraries(quickstep_storage_PackedRowStoreValueAccessor
                       quickstep_types_TypedValue
                       quickstep_utility_BitVector
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_PartitionedHashTablePool
+                      glog
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
+                      quickstep_storage_HashTableBase
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)                    
 target_link_libraries(quickstep_storage_PreloaderThread
                       glog
                       quickstep_catalog_CatalogDatabase
@@ -971,6 +981,7 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_PackedRowStoreTupleStorageSubBlock
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SplitRowStoreTupleStorageSubBlock
                       quickstep_storage_StorageBlockBase
@@ -1167,6 +1178,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PackedRowStoreTupleStorageSubBlock
                       quickstep_storage_PackedRowStoreValueAccessor
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
new file mode 100644
index 0000000..7f58fa9
--- /dev/null
+++ b/storage/PartitionedHashTablePool.hpp
@@ -0,0 +1,205 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+#define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+
+#include <chrono>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
+#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief A pool of HashTables used for a single aggregation handle. Each
+ *        HashTable represents values from a given partition, which is
+ *        determined by the keys in the group by clause.
+ **/
+class PartitionedHashTablePool {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param num_partitions The number of partitions (i.e. number of HashTables)
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param agg_handle The aggregation handle.
+   * @param storage_manager A pointer to the storage manager.
+   *
+   * @note The estimate of number of entries is quite inaccurate at this time.
+   *       If we go by the current estimate, each hash table demands much
+   *       larger space than it actually needs, which causes the system to
+   *       either trigger evictions or worse - run out of memory. To fix this
+   *       issue, we divide the estimate by 100. The division will not affect
+   *       correctness, however it may allocate some hash tables smaller space
+   *       than their requirement, causing them to be resized during build
+   *       phase, which has a performance penalty.
+   **/
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           AggregationHandle *agg_handle,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            reduceEstimatedCardinality(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        agg_handle_(DCHECK_NOTNULL(agg_handle)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           const std::vector<std::size_t> &payload_sizes,
+                           const std::vector<AggregationHandle *> &handles,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            reduceEstimatedCardinality(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        payload_sizes_(payload_sizes),
+        handles_(handles),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @param partition_id The ID of the partitioned HashTable.
+   *
+   * @return A hash table pointer for the given HashTable.
+   **/
+  AggregationStateHashTableBase* getHashTable(const std::size_t partition_id) {
+    DCHECK_LT(partition_id, num_partitions_);
+    DCHECK_LT(partition_id, hash_tables_.size());
+    return hash_tables_[partition_id].get();
+  }
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @param partition_id The ID of the partitioned HashTable.
+   *
+   * @return A hash table pointer for the given HashTable.
+   **/
+  AggregationStateHashTableBase* getHashTableFast(const std::size_t partition_id) {
+    DCHECK_LT(partition_id, num_partitions_);
+    DCHECK_LT(partition_id, hash_tables_.size());
+    return hash_tables_[partition_id].get();
+  }
+
+  /**
+   * @brief Get all the hash tables from the pool.
+   *
+   * @warning The caller should ensure that this call is made when no hash table
+   *          is being checked in or checked out from the pool. In other words
+   *          the hash table pool is in read-only state.
+   *
+   * @param All the hash tables in the pool.
+   *
+   **/
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+      getAllHashTables() {
+    return &hash_tables_;
+  }
+
+  inline std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
+ private:
+  void initializeAllHashTables() {
+    for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
+      AggregationStateHashTableBase *part_hash_table = createNewHashTableFast();
+      hash_tables_.push_back(
+          std::unique_ptr<AggregationStateHashTableBase>(part_hash_table));
+    }
+  }
+
+  AggregationStateHashTableBase* createNewHashTable() {
+    return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
+                                               group_by_types_,
+                                               estimated_num_entries_,
+                                               storage_manager_);
+  }
+
+  AggregationStateHashTableBase* createNewHashTableFast() {
+    return AggregationStateFastHashTableFactory::CreateResizable(
+                hash_table_impl_type_,
+                group_by_types_,
+                estimated_num_entries_,
+                payload_sizes_,
+                handles_,
+                storage_manager_);
+  }
+
+  inline std::size_t reduceEstimatedCardinality(
+      const std::size_t original_estimate,
+      const std::size_t num_partitions) const {
+    CHECK_NE(num_partitions, 0u);
+    return original_estimate / num_partitions;
+  }
+
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
+
+  const std::size_t estimated_num_entries_;
+  const std::size_t num_partitions_;
+
+  const HashTableImplType hash_table_impl_type_;
+
+  const std::vector<const Type *> group_by_types_;
+
+  std::vector<std::size_t> payload_sizes_;
+
+  AggregationHandle *agg_handle_;
+  const std::vector<AggregationHandle *> handles_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(PartitionedHashTablePool);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ec5990f..94c46a8 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -41,6 +41,7 @@
 #include "storage/IndexSubBlock.hpp"
 #include "storage/InsertDestinationInterface.hpp"
 #include "storage/PackedRowStoreTupleStorageSubBlock.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
 #include "storage/SMAIndexSubBlock.hpp"
 #include "storage/SplitRowStoreTupleStorageSubBlock.hpp"
 #include "storage/StorageBlockBase.hpp"
@@ -1369,4 +1370,118 @@ const std::size_t StorageBlock::getNumTuples() const {
   return tuple_store_->numTuples();
 }
 
+void StorageBlock::aggregateGroupByPartitioned(
+    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const Predicate *predicate,
+    std::unique_ptr<TupleIdSequence> *reuse_matches,
+    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+    PartitionedHashTablePool *hashtable_pool) const {
+  DCHECK(!group_by.empty())
+      << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
+
+  SubBlocksReference sub_blocks_ref(*tuple_store_,
+                                    indices_,
+                                    indices_consistent_);
+
+  // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+  std::vector<attribute_id> arg_ids;
+  std::vector<attribute_id> argument_ids;
+
+  // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+  std::vector<attribute_id> key_ids;
+
+  // An intermediate ValueAccessor that stores the materialized 'arguments' for
+  // this aggregate, as well as the GROUP BY expression values.
+  ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ValueAccessor> accessor;
+  if (predicate) {
+    if (!*reuse_matches) {
+      // If there is a filter predicate that hasn't already been evaluated,
+      // evaluate it now and save the results for other aggregates on this
+      // same block.
+      reuse_matches->reset(getMatchesForPredicate(predicate));
+    }
+
+    // Create a filtered ValueAccessor that only iterates over predicate
+    // matches.
+    accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+  } else {
+    // Create a ValueAccessor that iterates over all tuples in this block
+    accessor.reset(tuple_store_->createValueAccessor());
+  }
+
+  attribute_id attr_id = 0;
+
+  // First, put GROUP BY keys into 'temp_result'.
+  if (reuse_group_by_vectors->empty()) {
+    // Compute GROUP BY values from group_by Scalars, and store them in
+    // reuse_group_by_vectors for reuse by other aggregates on this same
+    // block.
+    reuse_group_by_vectors->reserve(group_by.size());
+    for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+      reuse_group_by_vectors->emplace_back(
+          group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+      key_ids.push_back(attr_id++);
+    }
+  } else {
+    // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+    DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+        << "Wrong number of reuse_group_by_vectors";
+    for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+      temp_result.addColumn(reuse_cv.get(), false);
+      key_ids.push_back(attr_id++);
+    }
+  }
+
+  // Compute argument vectors and add them to 'temp_result'.
+  for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
+    arg_ids.clear();
+    for (const std::unique_ptr<const Scalar> &args : argument) {
+      temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+      argument_ids.push_back(attr_id++);
+    }
+    if (argument.empty()) {
+      argument_ids.push_back(kInvalidAttributeID);
+    }
+  }
+
+  // Compute the partitions for the tuple formed by group by values.
+  std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+  partition_membership.resize(hashtable_pool->getNumPartitions());
+
+  // Create a tuple-id sequence for each partition.
+  for (std::size_t partition = 0;
+       partition < hashtable_pool->getNumPartitions();
+       ++partition) {
+    partition_membership[partition].reset(new TupleIdSequence(temp_result.getEndPosition()));
+  }
+
+  // Iterate over ValueAccessor for each tuple,
+  // set a bit in the appropriate TupleIdSequence.
+  temp_result.beginIteration();
+  while (temp_result.next()) {
+    const std::size_t curr_tuple_partition_id =
+        temp_result.getTupleWithAttributes(key_ids)->getTupleHash() %
+        hashtable_pool->getNumPartitions();
+    partition_membership[curr_tuple_partition_id]->set(
+        temp_result.getCurrentPosition(), true);
+  }
+  // For each partition, create an adapter around Value Accessor and
+  // TupleIdSequence.
+  std::vector<std::unique_ptr<
+      TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+  adapter.resize(hashtable_pool->getNumPartitions());
+  for (std::size_t partition = 0;
+       partition < hashtable_pool->getNumPartitions();
+       ++partition) {
+    adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+        *partition_membership[partition]));
+    hashtable_pool->getHashTable(partition)
+        ->upsertValueAccessorCompositeKeyFast(
+            argument_ids, adapter[partition].get(), key_ids, true);
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index bab5bab..3fcaddf 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
 class InsertDestinationInterface;
+class PartitionedHashTablePool;
 class Predicate;
 class Scalar;
 class StorageBlockLayout;
@@ -466,6 +467,48 @@ class StorageBlock : public StorageBlockBase {
       std::unique_ptr<TupleIdSequence> *reuse_matches,
       std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
+
+  /**
+   * @brief Perform the GROUP BY aggregation for the case when aggregation is
+   *        partitioned.
+   *
+   * @note The difference between this method and the aggregateGroupBy method
+   *       is that in this method, the tuples are routed to different HashTables
+   *       based on the partition to which they belong to. The partition is
+   *       determined by the GROUP BY attributes. Right now hash based
+   *       partitioning is performed.
+   *
+   * @param arguments The arguments to the aggregation function as Scalars.
+   * @param group_by The list of GROUP BY attributes/expressions. The tuples in
+   *        this storage block are grouped by these attributes before
+   *        aggregation.
+   * @param predicate A predicate for selection. nullptr indicates that all
+   *        tuples should be aggregated on.
+   * @param reuse_matches This parameter is used to store and reuse tuple-id
+   *        sequence of matches pre-computed in an earlier invocations of
+   *        aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
+   *        use.  Current invocation of aggregateGroupBy() will reuse
+   *        TupleIdSequence if passed, otherwise computes a TupleIdSequence based
+   *        on \c predicate and stores in \c reuse_matches. We use
+   *        std::unique_ptr for each of use, since the caller will not have to
+   *        selective free.
+   * @param reuse_group_by_vectors This parameter is used to store and reuse
+   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
+   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
+   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
+   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
+   *        on \c group_by and stores them in \c reuse_group_by_vectors.
+   * @param hashtable_pool The pool of aggregation HashTables. Each hash table
+   *        in this pool belongs to a unique partition.
+   **/
+  void aggregateGroupByPartitioned(
+      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const Predicate *predicate,
+      std::unique_ptr<TupleIdSequence> *reuse_matches,
+      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+      PartitionedHashTablePool *hashtable_pool) const;
+
   /**
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    *        as keys into the distinctify hash table.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/types/containers/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index aacb63a..c2a6623 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple
                       quickstep_catalog_CatalogTypedefs
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple_proto
+                      quickstep_utility_CompositeHash
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_types_containers_Tuple_proto
                       quickstep_types_TypedValue_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/types/containers/Tuple.hpp
----------------------------------------------------------------------
diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp
index 60f832c..6237d54 100644
--- a/types/containers/Tuple.hpp
+++ b/types/containers/Tuple.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.pb.h"
+#include "utility/CompositeHash.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -218,6 +219,13 @@ class Tuple {
     return attribute_values_.size();
   }
 
+  /**
+   * @brief Get the hash value of the tuple.
+   **/
+  std::size_t getTupleHash() const {
+    return HashCompositeKey(attribute_values_);
+  }
+
  private:
   /**
    * @brief Constructor which does not create any attributes, nor pre-reserve

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ddaae45..4fb6e5b 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -167,6 +167,7 @@ add_library(quickstep_utility_BloomFilter_proto
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
 add_library(quickstep_utility_ExecutionDAGVisualizer
@@ -227,6 +228,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
                       glog)
+target_link_libraries(quickstep_utility_CompositeHash
+                      quickstep_types_TypedValue
+                      quickstep_utility_HashPair
+                      glog)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -318,6 +323,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
+                      quickstep_utility_CompositeHash
                       quickstep_utility_DAG
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_ExecutionDAGVisualizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f7183fcf/utility/CompositeHash.hpp
----------------------------------------------------------------------
diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp
new file mode 100644
index 0000000..517bc96
--- /dev/null
+++ b/utility/CompositeHash.hpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "types/TypedValue.hpp"
+#include "utility/HashPair.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief Compute the hash value of a composite key.
+ *
+ * @param key A vector of TypedValues which together form the composite key.
+ * @return The hash value.
+ **/
+static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) {
+  DCHECK(!key.empty());
+  std::size_t hash = key.front().getHash();
+  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+       key_it != key.end();
+       ++key_it) {
+    hash = CombineHashes(hash, key_it->getHash());
+  }
+  return hash;
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_