You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/30 23:19:17 UTC

[42/50] [abbrv] incubator-quickstep git commit: Groupby hashtable pool (#236)

Groupby hashtable pool (#236)

- Created a HashTablePool class for group by clause.
- Each thread can checkout it's own hash table while doing group by
  aggregation.
- AggregationOperationState uses one hash table pool per group by
  clause.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2ddb67bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2ddb67bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2ddb67bf

Branch: refs/heads/work-order-serialization
Commit: 2ddb67bf438878b572e997caec2397e4c7ac9b8f
Parents: 5bda90e
Author: Harshad Deshmukh <d....@gmail.com>
Authored: Tue May 24 19:04:39 2016 -0500
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Mon May 30 15:47:53 2016 -0700

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.hpp   | 105 ++++++++++++
 expressions/aggregation/AggregationHandle.hpp   |  15 +-
 .../aggregation/AggregationHandleAvg.cpp        |   9 +
 .../aggregation/AggregationHandleAvg.hpp        |   4 +
 .../aggregation/AggregationHandleCount.cpp      |  11 ++
 .../aggregation/AggregationHandleCount.hpp      |   4 +
 .../aggregation/AggregationHandleDistinct.hpp   |   7 +
 .../aggregation/AggregationHandleMax.cpp        |   9 +
 .../aggregation/AggregationHandleMax.hpp        |   4 +
 .../aggregation/AggregationHandleMin.cpp        |   9 +
 .../aggregation/AggregationHandleMin.hpp        |   4 +
 .../aggregation/AggregationHandleSum.cpp        |   9 +
 .../aggregation/AggregationHandleSum.hpp        |   4 +
 expressions/aggregation/CMakeLists.txt          |   2 +
 .../tests/AggregationHandleAvg_unittest.cpp     | 109 ++++++++++++
 .../tests/AggregationHandleCount_unittest.cpp   | 126 +++++++++++++-
 .../tests/AggregationHandleMax_unittest.cpp     | 122 ++++++++++++++
 .../tests/AggregationHandleMin_unittest.cpp     | 121 ++++++++++++++
 .../tests/AggregationHandleSum_unittest.cpp     | 124 ++++++++++++++
 query_execution/QueryContext.hpp                |   2 +-
 storage/AggregationOperationState.cpp           |  84 ++++++++--
 storage/AggregationOperationState.hpp           |   4 +
 storage/CMakeLists.txt                          |  10 ++
 storage/HashTablePool.hpp                       | 166 +++++++++++++++++++
 storage/StorageManager.cpp                      |   3 +-
 25 files changed, 1045 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 52249f7..0267e17 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -44,6 +44,90 @@ class ValueAccessor;
  *  @{
  */
 
+/**
+ * @brief An upserter class for modifying the destination hash table while
+ *        merging two group by hash tables.
+ **/
+template <typename HandleT, typename StateT>
+class HashTableStateUpserter {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param handle The aggregation handle being used.
+   * @param source_state The aggregation state in the source aggregation hash
+   *        table. The corresponding state (for the same key) in the destination
+   *        hash table will be upserted.
+   **/
+  HashTableStateUpserter(const HandleT &handle, const StateT &source_state)
+      : handle_(handle), source_state_(source_state) {}
+
+  /**
+   * @brief The operator for the functor required for the upsert.
+   *
+   * @param destination_state The aggregation state in the aggregation hash
+   *        table that is being upserted.
+   **/
+  void operator()(StateT *destination_state) {
+    handle_.mergeStates(source_state_, destination_state);
+  }
+
+ private:
+  const HandleT &handle_;
+  const StateT &source_state_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter);
+};
+
+/**
+ * @brief A class to support the functor for merging group by hash tables.
+ **/
+template <typename HandleT, typename StateT, typename HashTableT>
+class HashTableMerger {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param handle The Aggregation handle being used.
+   * @param destination_hash_table The destination hash table to which other
+   *        hash tables will be merged.
+   **/
+  HashTableMerger(const HandleT &handle,
+                  AggregationStateHashTableBase *destination_hash_table)
+      : handle_(handle),
+        destination_hash_table_(
+            static_cast<HashTableT *>(destination_hash_table)) {}
+
+  /**
+   * @brief The operator for the functor.
+   *
+   * @param group_by_key The group by key being merged.
+   * @param source_state The aggregation state for the given key in the source
+   *        aggregation hash table.
+   **/
+  inline void operator()(const std::vector<TypedValue> &group_by_key,
+                         const StateT &source_state) {
+    const StateT *original_state =
+        destination_hash_table_->getSingleCompositeKey(group_by_key);
+    if (original_state != nullptr) {
+      HashTableStateUpserter<HandleT, StateT> upserter(
+          handle_, source_state);
+      // The CHECK is required as upsertCompositeKey can return false if the
+      // hash table runs out of space during the upsert process. The ideal
+      // solution will be to retry again if the upsert fails.
+      CHECK(destination_hash_table_->upsertCompositeKey(
+          group_by_key, *original_state, &upserter));
+    } else {
+      destination_hash_table_->putCompositeKey(group_by_key, source_state);
+    }
+  }
+
+ private:
+  const HandleT &handle_;
+  HashTableT *destination_hash_table_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
+};
 
 /**
  * @brief The helper intermediate subclass of AggregationHandle that provides
@@ -140,6 +224,11 @@ class AggregationConcreteHandle : public AggregationHandle {
     return static_cast<const HandleT*>(this)->finalizeHashTableEntry(*group_state);
   }
 
+  template <typename HandleT, typename StateT, typename HashTableT>
+  void mergeGroupByHashTablesHelper(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
 };
@@ -373,6 +462,22 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
   }
 }
 
+template <typename HandleT,
+          typename StateT,
+          typename HashTableT>
+void AggregationConcreteHandle::mergeGroupByHashTablesHelper(
+    const AggregationStateHashTableBase &source_hash_table,
+    AggregationStateHashTableBase *destination_hash_table) const {
+  const HandleT &handle = static_cast<const HandleT &>(*this);
+  const HashTableT &source_hash_table_concrete =
+      static_cast<const HashTableT &>(source_hash_table);
+
+  HashTableMerger<HandleT, StateT, HashTableT> merger(handle,
+                                                      destination_hash_table);
+
+  source_hash_table_concrete.forEachCompositeKey(&merger);
+}
+
 }  // namespace quickstep
 
 #endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 625f334..cdebb03 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -276,7 +276,7 @@ class AggregationHandle {
    * each GROUP BY group. Later, a second-round aggregation on the distinctify
    * hash table will be performed to actually compute the aggregated result for
    * each GROUP BY group.
-   * 
+   *
    * In the case of single aggregation where there is no GROUP BY expressions,
    * we simply treat it as a special GROUP BY case that the GROUP BY expression
    * vector is empty.
@@ -349,6 +349,19 @@ class AggregationHandle {
       const AggregationStateHashTableBase &distinctify_hash_table,
       AggregationStateHashTableBase *aggregation_hash_table) const = 0;
 
+  /**
+   * @brief Merge two GROUP BY hash tables in one.
+   *
+   * @note Both the hash tables should have the same structure.
+   *
+   * @param source_hash_table The hash table which will get merged.
+   * @param destination_hash_table The hash table to which we will merge the
+   *        other hash table.
+   **/
+  virtual void mergeGroupByHashTables(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const = 0;
+
  protected:
   AggregationHandle() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index cb0d63d..42a2fb9 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -203,4 +203,13 @@ void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
           aggregation_hash_table);
 }
 
+void AggregationHandleAvg::mergeGroupByHashTables(
+    const AggregationStateHashTableBase &source_hash_table,
+    AggregationStateHashTableBase *destination_hash_table) const {
+  mergeGroupByHashTablesHelper<AggregationHandleAvg,
+                               AggregationStateAvg,
+                               AggregationStateHashTable<AggregationStateAvg>>(
+      source_hash_table, destination_hash_table);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index 6a94ee6..4ad4b21 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -158,6 +158,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &distinctify_hash_table,
       AggregationStateHashTableBase *aggregation_hash_table) const override;
 
+  void mergeGroupByHashTables(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const override;
+
  private:
   friend class AggregateFunctionAvg;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 5ece8ba..964b7c2 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -206,6 +206,17 @@ void AggregationHandleCount<count_star, nullable_type>
           aggregation_hash_table);
 }
 
+template <bool count_star, bool nullable_type>
+void AggregationHandleCount<count_star, nullable_type>::mergeGroupByHashTables(
+    const AggregationStateHashTableBase &source_hash_table,
+    AggregationStateHashTableBase *destination_hash_table) const {
+  mergeGroupByHashTablesHelper<
+      AggregationHandleCount,
+      AggregationStateCount,
+      AggregationStateHashTable<AggregationStateCount>>(source_hash_table,
+                                                        destination_hash_table);
+}
+
 // Explicitly instantiate and compile in the different versions of
 // AggregationHandleCount we need. Note that we do not compile a version with
 // 'count_star == true' and 'nullable_type == true', as that combination is

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 6bb4e65..50138b9 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -166,6 +166,10 @@ class AggregationHandleCount : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &distinctify_hash_table,
       AggregationStateHashTableBase *aggregation_hash_table) const override;
 
+  void mergeGroupByHashTables(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const override;
+
  private:
   friend class AggregateFunctionCount;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index 918fdf8..6342c2b 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -109,6 +109,13 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &hash_table,
       std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
+  void mergeGroupByHashTables(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const override {
+    LOG(FATAL)
+        << "AggregationHandleDistinct does not support mergeGroupByHashTables";
+  }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index 4703657..a7a4a52 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -139,4 +139,13 @@ void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
           aggregation_hash_table);
 }
 
+void AggregationHandleMax::mergeGroupByHashTables(
+    const AggregationStateHashTableBase &source_hash_table,
+    AggregationStateHashTableBase *destination_hash_table) const {
+  mergeGroupByHashTablesHelper<AggregationHandleMax,
+                               AggregationStateMax,
+                               AggregationStateHashTable<AggregationStateMax>>(
+      source_hash_table, destination_hash_table);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 8932ef8..5af5a12 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -151,6 +151,10 @@ class AggregationHandleMax : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &distinctify_hash_table,
       AggregationStateHashTableBase *aggregation_hash_table) const override;
 
+  void mergeGroupByHashTables(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const override;
+
  private:
   friend class AggregateFunctionMax;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index de2709a..ca9b163 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -141,4 +141,13 @@ void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
           aggregation_hash_table);
 }
 
+void AggregationHandleMin::mergeGroupByHashTables(
+    const AggregationStateHashTableBase &source_hash_table,
+    AggregationStateHashTableBase *destination_hash_table) const {
+  mergeGroupByHashTablesHelper<AggregationHandleMin,
+                               AggregationStateMin,
+                               AggregationStateHashTable<AggregationStateMin>>(
+      source_hash_table, destination_hash_table);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 4e4c05d..f68bb9d 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -149,6 +149,10 @@ class AggregationHandleMin : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &distinctify_hash_table,
       AggregationStateHashTableBase *aggregation_hash_table) const override;
 
+  void mergeGroupByHashTables(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const override;
+
  private:
   friend class AggregateFunctionMin;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 14421d2..691ff39 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -190,4 +190,13 @@ void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
           aggregation_hash_table);
 }
 
+void AggregationHandleSum::mergeGroupByHashTables(
+    const AggregationStateHashTableBase &source_hash_table,
+    AggregationStateHashTableBase *destination_hash_table) const {
+  mergeGroupByHashTablesHelper<AggregationHandleSum,
+                               AggregationStateSum,
+                               AggregationStateHashTable<AggregationStateSum>>(
+      source_hash_table, destination_hash_table);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index b765243..fdc0884 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -148,6 +148,10 @@ class AggregationHandleSum : public AggregationConcreteHandle {
       const AggregationStateHashTableBase &distinctify_hash_table,
       AggregationStateHashTableBase *aggregation_hash_table) const override;
 
+  void mergeGroupByHashTables(
+      const AggregationStateHashTableBase &source_hash_table,
+      AggregationStateHashTableBase *destination_hash_table) const override;
+
  private:
   friend class AggregateFunctionSum;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 26cec7f..416c4c6 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -291,6 +291,8 @@ target_link_libraries(AggregationHandle_tests
                       quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_expressions_aggregation_AggregationHandleSum
                       quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageManager
                       quickstep_types_CharType
                       quickstep_types_DateOperatorOverloads
                       quickstep_types_DatetimeIntervalType

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
index d27b54e..fd82cba 100644
--- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp
@@ -26,6 +26,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleAvg.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DateOperatorOverloads.hpp"
 #include "types/DatetimeIntervalType.hpp"
@@ -238,6 +239,7 @@ class AggregationHandleAvgTest : public::testing::Test {
 
   std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
   std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
+  std::unique_ptr<StorageManager> storage_manager_;
 };
 
 const int AggregationHandleAvgTest::kNumSamples;
@@ -417,4 +419,111 @@ TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
 }
 
+TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
+  const Type &long_non_null_type = LongType::Instance(false);
+  initializeHandle(long_non_null_type);
+  storage_manager_.reset(new StorageManager("./test_avg_data"));
+  std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+      aggregation_handle_avg_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+  std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+      aggregation_handle_avg_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+
+  AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+          destination_hash_table.get());
+
+  AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+          source_hash_table.get());
+
+  AggregationHandleAvg *aggregation_handle_avg_derived =
+      static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
+  // We create three keys: first is present in both the hash tables, second key
+  // is present only in the source hash table while the third key is present
+  // the destination hash table only.
+  std::vector<TypedValue> common_key;
+  common_key.emplace_back(static_cast<std::int64_t>(0));
+  std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+  exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+  exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+  const std::int64_t common_key_source_avg = 355;
+  TypedValue common_key_source_avg_val(common_key_source_avg);
+
+  const std::int64_t common_key_destination_avg = 295;
+  TypedValue common_key_destination_avg_val(common_key_destination_avg);
+
+  const std::int64_t exclusive_key_source_avg = 1;
+  TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg);
+
+  const std::int64_t exclusive_key_destination_avg = 1;
+  TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg);
+
+  std::unique_ptr<AggregationStateAvg> common_key_source_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+  std::unique_ptr<AggregationStateAvg> common_key_destination_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+  std::unique_ptr<AggregationStateAvg> exclusive_key_source_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+  std::unique_ptr<AggregationStateAvg> exclusive_key_destination_state(
+      static_cast<AggregationStateAvg *>(
+          aggregation_handle_avg_->createInitialState()));
+
+  // Create avg value states for keys.
+  aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(),
+                                                  common_key_source_avg_val);
+
+  aggregation_handle_avg_derived->iterateUnaryInl(
+      common_key_destination_state.get(), common_key_destination_avg_val);
+
+  aggregation_handle_avg_derived->iterateUnaryInl(
+      exclusive_key_destination_state.get(), exclusive_key_destination_avg_val);
+
+  aggregation_handle_avg_derived->iterateUnaryInl(
+      exclusive_key_source_state.get(), exclusive_key_source_avg_val);
+
+  // Add the key-state pairs to the hash tables.
+  source_hash_table_derived->putCompositeKey(common_key,
+                                             *common_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      common_key, *common_key_destination_state);
+  source_hash_table_derived->putCompositeKey(exclusive_source_key,
+                                             *exclusive_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      exclusive_destination_key, *exclusive_key_destination_state);
+
+  EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+  EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+  aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
+                                                  destination_hash_table.get());
+
+  EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+  CheckAvgValue<double>(
+      (common_key_destination_avg_val.getLiteral<std::int64_t>() +
+          common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
+      *aggregation_handle_avg_derived,
+      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+  CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
+                  *aggregation_handle_avg_derived,
+                  *(destination_hash_table_derived->getSingleCompositeKey(
+                      exclusive_destination_key)));
+  CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
+                  *aggregation_handle_avg_derived,
+                  *(source_hash_table_derived->getSingleCompositeKey(
+                      exclusive_source_key)));
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
index 7bebf6a..bf02523 100644
--- a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp
@@ -27,6 +27,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleCount.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DoubleType.hpp"
 #include "types/FloatType.hpp"
@@ -355,6 +356,7 @@ class AggregationHandleCountTest : public::testing::Test {
 
   std::unique_ptr<AggregationHandle> aggregation_handle_count_;
   std::unique_ptr<AggregationState> aggregation_handle_count_state_;
+  std::unique_ptr<StorageManager> storage_manager_;
 };
 
 typedef AggregationHandleCountTest AggregationHandleCountDeathTest;
@@ -477,5 +479,127 @@ TEST_F(AggregationHandleCountTest, ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kLong));
 }
 
-}  // namespace quickstep
+TEST_F(AggregationHandleCountTest, GroupByTableMergeTestCount) {
+  const Type &long_non_null_type = LongType::Instance(false);
+  initializeHandle(&long_non_null_type);
+  storage_manager_.reset(new StorageManager("./test_count_data"));
+  std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+      aggregation_handle_count_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+  std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+      aggregation_handle_count_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+
+  AggregationStateHashTable<AggregationStateCount> *destination_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateCount> *>(
+          destination_hash_table.get());
+
+  AggregationStateHashTable<AggregationStateCount> *source_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateCount> *>(
+          source_hash_table.get());
+
+  // TODO(harshad) - Use TemplateUtil::CreateBoolInstantiatedInstance to
+  // generate all the combinations of the bool template arguments and test them.
+  AggregationHandleCount<true, false> *aggregation_handle_count_derived =
+      static_cast<AggregationHandleCount<true, false> *>(
+          aggregation_handle_count_.get());
+  // We create three keys: first is present in both the hash tables, second key
+  // is present only in the source hash table while the third key is present
+  // the destination hash table only.
+  std::vector<TypedValue> common_key;
+  common_key.emplace_back(static_cast<std::int64_t>(0));
+  std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+  exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+  exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+  const std::int64_t common_key_source_count = 1;
+  TypedValue common_key_source_count_val(common_key_source_count);
+
+  const std::int64_t common_key_destination_count = 1;
+  TypedValue common_key_destination_count_val(common_key_destination_count);
+
+  const std::int64_t exclusive_key_source_count = 1;
+  TypedValue exclusive_key_source_count_val(exclusive_key_source_count);
+
+  const std::int64_t exclusive_key_destination_count = 1;
+  TypedValue exclusive_key_destination_count_val(exclusive_key_destination_count);
+
+  std::unique_ptr<AggregationStateCount> common_key_source_state(
+      static_cast<AggregationStateCount *>(
+          aggregation_handle_count_->createInitialState()));
+  std::unique_ptr<AggregationStateCount> common_key_destination_state(
+      static_cast<AggregationStateCount *>(
+          aggregation_handle_count_->createInitialState()));
+  std::unique_ptr<AggregationStateCount> exclusive_key_source_state(
+      static_cast<AggregationStateCount *>(
+          aggregation_handle_count_->createInitialState()));
+  std::unique_ptr<AggregationStateCount> exclusive_key_destination_state(
+      static_cast<AggregationStateCount *>(
+          aggregation_handle_count_->createInitialState()));
+
+  // Create count value states for keys.
+  aggregation_handle_count_derived->iterateUnaryInl(common_key_source_state.get(),
+                                                  common_key_source_count_val);
+  std::int64_t actual_val = aggregation_handle_count_->finalize(*common_key_source_state)
+                       .getLiteral<std::int64_t>();
+  EXPECT_EQ(common_key_source_count_val.getLiteral<std::int64_t>(), actual_val);
+
+  aggregation_handle_count_derived->iterateUnaryInl(
+      common_key_destination_state.get(), common_key_destination_count_val);
+  actual_val = aggregation_handle_count_->finalize(*common_key_destination_state)
+                   .getLiteral<std::int64_t>();
+  EXPECT_EQ(common_key_destination_count_val.getLiteral<std::int64_t>(), actual_val);
+
+  aggregation_handle_count_derived->iterateUnaryInl(
+      exclusive_key_destination_state.get(), exclusive_key_destination_count_val);
+  actual_val =
+      aggregation_handle_count_->finalize(*exclusive_key_destination_state)
+          .getLiteral<std::int64_t>();
+  EXPECT_EQ(exclusive_key_destination_count_val.getLiteral<std::int64_t>(), actual_val);
+
+  aggregation_handle_count_derived->iterateUnaryInl(
+      exclusive_key_source_state.get(), exclusive_key_source_count_val);
+  actual_val = aggregation_handle_count_->finalize(*exclusive_key_source_state)
+                   .getLiteral<std::int64_t>();
+  EXPECT_EQ(exclusive_key_source_count_val.getLiteral<std::int64_t>(), actual_val);
+
+  // Add the key-state pairs to the hash tables.
+  source_hash_table_derived->putCompositeKey(common_key,
+                                             *common_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      common_key, *common_key_destination_state);
+  source_hash_table_derived->putCompositeKey(exclusive_source_key,
+                                             *exclusive_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      exclusive_destination_key, *exclusive_key_destination_state);
+
+  EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+  EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+  aggregation_handle_count_->mergeGroupByHashTables(*source_hash_table,
+                                                  destination_hash_table.get());
+
+  EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+  CheckCountValue(
+      common_key_destination_count_val.getLiteral<std::int64_t>() +
+          common_key_source_count_val.getLiteral<std::int64_t>(),
+      *aggregation_handle_count_derived,
+      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+  CheckCountValue(exclusive_key_destination_count_val.getLiteral<std::int64_t>(),
+                  *aggregation_handle_count_derived,
+                  *(destination_hash_table_derived->getSingleCompositeKey(
+                      exclusive_destination_key)));
+  CheckCountValue(exclusive_key_source_count_val.getLiteral<std::int64_t>(),
+                  *aggregation_handle_count_derived,
+                  *(source_hash_table_derived->getSingleCompositeKey(
+                      exclusive_source_key)));
+}
 
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
index 027f24b..fc25e91 100644
--- a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp
@@ -29,6 +29,8 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleMax.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
 #include "types/DatetimeLit.hpp"
@@ -413,6 +415,7 @@ class AggregationHandleMaxTest : public ::testing::Test {
 
   std::unique_ptr<AggregationHandle> aggregation_handle_max_;
   std::unique_ptr<AggregationState> aggregation_handle_max_state_;
+  std::unique_ptr<StorageManager> storage_manager_;
 };
 
 template <>
@@ -637,4 +640,123 @@ TEST_F(AggregationHandleMaxTest, ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
 }
 
+TEST_F(AggregationHandleMaxTest, GroupByTableMergeTest) {
+  const Type &int_non_null_type = IntType::Instance(false);
+  initializeHandle(int_non_null_type);
+  storage_manager_.reset(new StorageManager("./test_max_data"));
+  std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+      aggregation_handle_max_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &int_non_null_type),
+          10,
+          storage_manager_.get()));
+  std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+      aggregation_handle_max_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &int_non_null_type),
+          10,
+          storage_manager_.get()));
+
+  AggregationStateHashTable<AggregationStateMax> *destination_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateMax> *>(
+          destination_hash_table.get());
+
+  AggregationStateHashTable<AggregationStateMax> *source_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateMax> *>(
+          source_hash_table.get());
+
+  AggregationHandleMax *aggregation_handle_max_derived =
+      static_cast<AggregationHandleMax *>(aggregation_handle_max_.get());
+  // We create three keys: first is present in both the hash tables, second key
+  // is present only in the source hash table while the third key is present
+  // the destination hash table only.
+  std::vector<TypedValue> common_key;
+  common_key.emplace_back(0);
+  std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+  exclusive_source_key.emplace_back(1);
+  exclusive_destination_key.emplace_back(2);
+
+  const int common_key_source_max = 3000;
+  TypedValue common_key_source_max_val(common_key_source_max);
+
+  const int common_key_destination_max = 4000;
+  TypedValue common_key_destination_max_val(common_key_destination_max);
+
+  const int exclusive_key_source_max = 100;
+  TypedValue exclusive_key_source_max_val(exclusive_key_source_max);
+
+  const int exclusive_key_destination_max = 200;
+  TypedValue exclusive_key_destination_max_val(exclusive_key_destination_max);
+
+  std::unique_ptr<AggregationStateMax> common_key_source_state(
+      static_cast<AggregationStateMax *>(
+          aggregation_handle_max_->createInitialState()));
+  std::unique_ptr<AggregationStateMax> common_key_destination_state(
+      static_cast<AggregationStateMax *>(
+          aggregation_handle_max_->createInitialState()));
+  std::unique_ptr<AggregationStateMax> exclusive_key_source_state(
+      static_cast<AggregationStateMax *>(
+          aggregation_handle_max_->createInitialState()));
+  std::unique_ptr<AggregationStateMax> exclusive_key_destination_state(
+      static_cast<AggregationStateMax *>(
+          aggregation_handle_max_->createInitialState()));
+
+  // Create max value states for keys.
+  aggregation_handle_max_derived->iterateUnaryInl(common_key_source_state.get(),
+                                                  common_key_source_max_val);
+  int actual_val = aggregation_handle_max_->finalize(*common_key_source_state)
+                       .getLiteral<int>();
+  EXPECT_EQ(common_key_source_max_val.getLiteral<int>(), actual_val);
+
+  aggregation_handle_max_derived->iterateUnaryInl(
+      common_key_destination_state.get(), common_key_destination_max_val);
+  actual_val = aggregation_handle_max_->finalize(*common_key_destination_state)
+                   .getLiteral<int>();
+  EXPECT_EQ(common_key_destination_max_val.getLiteral<int>(), actual_val);
+
+  aggregation_handle_max_derived->iterateUnaryInl(
+      exclusive_key_destination_state.get(), exclusive_key_destination_max_val);
+  actual_val =
+      aggregation_handle_max_->finalize(*exclusive_key_destination_state)
+          .getLiteral<int>();
+  EXPECT_EQ(exclusive_key_destination_max_val.getLiteral<int>(), actual_val);
+
+  aggregation_handle_max_derived->iterateUnaryInl(
+      exclusive_key_source_state.get(), exclusive_key_source_max_val);
+  actual_val = aggregation_handle_max_->finalize(*exclusive_key_source_state)
+                   .getLiteral<int>();
+  EXPECT_EQ(exclusive_key_source_max_val.getLiteral<int>(), actual_val);
+
+  // Add the key-state pairs to the hash tables.
+  source_hash_table_derived->putCompositeKey(common_key,
+                                             *common_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      common_key, *common_key_destination_state);
+  source_hash_table_derived->putCompositeKey(exclusive_source_key,
+                                             *exclusive_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      exclusive_destination_key, *exclusive_key_destination_state);
+
+  EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+  EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+  aggregation_handle_max_->mergeGroupByHashTables(*source_hash_table,
+                                                  destination_hash_table.get());
+
+  EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+  CheckMaxValue<int>(
+      common_key_destination_max_val.getLiteral<int>(),
+      *aggregation_handle_max_derived,
+      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+  CheckMaxValue<int>(exclusive_key_destination_max_val.getLiteral<int>(),
+                     *aggregation_handle_max_derived,
+                     *(destination_hash_table_derived->getSingleCompositeKey(
+                         exclusive_destination_key)));
+  CheckMaxValue<int>(exclusive_key_source_max_val.getLiteral<int>(),
+                     *aggregation_handle_max_derived,
+                     *(source_hash_table_derived->getSingleCompositeKey(
+                         exclusive_source_key)));
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
index eb64472..a87ace9 100644
--- a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleMin.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
 #include "types/DatetimeLit.hpp"
@@ -411,6 +412,7 @@ class AggregationHandleMinTest : public ::testing::Test {
 
   std::unique_ptr<AggregationHandle> aggregation_handle_min_;
   std::unique_ptr<AggregationState> aggregation_handle_min_state_;
+  std::unique_ptr<StorageManager> storage_manager_;
 };
 
 template <>
@@ -634,4 +636,123 @@ TEST_F(AggregationHandleMinTest, ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
 }
 
+TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
+  const Type &int_non_null_type = IntType::Instance(false);
+  initializeHandle(int_non_null_type);
+  storage_manager_.reset(new StorageManager("./test_min_data"));
+  std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+      aggregation_handle_min_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &int_non_null_type),
+          10,
+          storage_manager_.get()));
+  std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+      aggregation_handle_min_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &int_non_null_type),
+          10,
+          storage_manager_.get()));
+
+  AggregationStateHashTable<AggregationStateMin> *destination_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateMin> *>(
+          destination_hash_table.get());
+
+  AggregationStateHashTable<AggregationStateMin> *source_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateMin> *>(
+          source_hash_table.get());
+
+  AggregationHandleMin *aggregation_handle_min_derived =
+      static_cast<AggregationHandleMin *>(aggregation_handle_min_.get());
+  // We create three keys: first is present in both the hash tables, second key
+  // is present only in the source hash table while the third key is present
+  // the destination hash table only.
+  std::vector<TypedValue> common_key;
+  common_key.emplace_back(0);
+  std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+  exclusive_source_key.emplace_back(1);
+  exclusive_destination_key.emplace_back(2);
+
+  const int common_key_source_min = 3000;
+  TypedValue common_key_source_min_val(common_key_source_min);
+
+  const int common_key_destination_min = 4000;
+  TypedValue common_key_destination_min_val(common_key_destination_min);
+
+  const int exclusive_key_source_min = 100;
+  TypedValue exclusive_key_source_min_val(exclusive_key_source_min);
+
+  const int exclusive_key_destination_min = 200;
+  TypedValue exclusive_key_destination_min_val(exclusive_key_destination_min);
+
+  std::unique_ptr<AggregationStateMin> common_key_source_state(
+      static_cast<AggregationStateMin *>(
+          aggregation_handle_min_->createInitialState()));
+  std::unique_ptr<AggregationStateMin> common_key_destination_state(
+      static_cast<AggregationStateMin *>(
+          aggregation_handle_min_->createInitialState()));
+  std::unique_ptr<AggregationStateMin> exclusive_key_source_state(
+      static_cast<AggregationStateMin *>(
+          aggregation_handle_min_->createInitialState()));
+  std::unique_ptr<AggregationStateMin> exclusive_key_destination_state(
+      static_cast<AggregationStateMin *>(
+          aggregation_handle_min_->createInitialState()));
+
+  // Create min value states for keys.
+  aggregation_handle_min_derived->iterateUnaryInl(common_key_source_state.get(),
+                                                  common_key_source_min_val);
+  int actual_val = aggregation_handle_min_->finalize(*common_key_source_state)
+                       .getLiteral<int>();
+  EXPECT_EQ(common_key_source_min_val.getLiteral<int>(), actual_val);
+
+  aggregation_handle_min_derived->iterateUnaryInl(
+      common_key_destination_state.get(), common_key_destination_min_val);
+  actual_val = aggregation_handle_min_->finalize(*common_key_destination_state)
+                   .getLiteral<int>();
+  EXPECT_EQ(common_key_destination_min_val.getLiteral<int>(), actual_val);
+
+  aggregation_handle_min_derived->iterateUnaryInl(
+      exclusive_key_destination_state.get(), exclusive_key_destination_min_val);
+  actual_val =
+      aggregation_handle_min_->finalize(*exclusive_key_destination_state)
+          .getLiteral<int>();
+  EXPECT_EQ(exclusive_key_destination_min_val.getLiteral<int>(), actual_val);
+
+  aggregation_handle_min_derived->iterateUnaryInl(
+      exclusive_key_source_state.get(), exclusive_key_source_min_val);
+  actual_val = aggregation_handle_min_->finalize(*exclusive_key_source_state)
+                   .getLiteral<int>();
+  EXPECT_EQ(exclusive_key_source_min_val.getLiteral<int>(), actual_val);
+
+  // Add the key-state pairs to the hash tables.
+  source_hash_table_derived->putCompositeKey(common_key,
+                                             *common_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      common_key, *common_key_destination_state);
+  source_hash_table_derived->putCompositeKey(exclusive_source_key,
+                                             *exclusive_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      exclusive_destination_key, *exclusive_key_destination_state);
+
+  EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+  EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+  aggregation_handle_min_->mergeGroupByHashTables(*source_hash_table,
+                                                  destination_hash_table.get());
+
+  EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+  CheckMinValue<int>(
+      common_key_source_min_val.getLiteral<int>(),
+      *aggregation_handle_min_derived,
+      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+  CheckMinValue<int>(exclusive_key_destination_min_val.getLiteral<int>(),
+                     *aggregation_handle_min_derived,
+                     *(destination_hash_table_derived->getSingleCompositeKey(
+                         exclusive_destination_key)));
+  CheckMinValue<int>(exclusive_key_source_min_val.getLiteral<int>(),
+                     *aggregation_handle_min_derived,
+                     *(source_hash_table_derived->getSingleCompositeKey(
+                         exclusive_source_key)));
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
index 7dbbeb3..abf8a89 100644
--- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
@@ -26,6 +26,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationHandleSum.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageManager.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
 #include "types/DoubleType.hpp"
@@ -237,6 +238,7 @@ class AggregationHandleSumTest : public::testing::Test {
 
   std::unique_ptr<AggregationHandle> aggregation_handle_sum_;
   std::unique_ptr<AggregationState> aggregation_handle_sum_state_;
+  std::unique_ptr<StorageManager> storage_manager_;
 };
 
 const int AggregationHandleSumTest::kNumSamples;
@@ -425,4 +427,126 @@ TEST_F(AggregationHandleSumTest, ResultTypeForArgumentTypeTest) {
   EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
 }
 
+TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
+  const Type &long_non_null_type = LongType::Instance(false);
+  initializeHandle(long_non_null_type);
+  storage_manager_.reset(new StorageManager("./test_sum_data"));
+  std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+      aggregation_handle_sum_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+  std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+      aggregation_handle_sum_->createGroupByHashTable(
+          HashTableImplType::kSimpleScalarSeparateChaining,
+          std::vector<const Type *>(1, &long_non_null_type),
+          10,
+          storage_manager_.get()));
+
+  AggregationStateHashTable<AggregationStateSum> *destination_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateSum> *>(
+          destination_hash_table.get());
+
+  AggregationStateHashTable<AggregationStateSum> *source_hash_table_derived =
+      static_cast<AggregationStateHashTable<AggregationStateSum> *>(
+          source_hash_table.get());
+
+  AggregationHandleSum *aggregation_handle_sum_derived =
+      static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get());
+  // We create three keys: first is present in both the hash tables, second key
+  // is present only in the source hash table while the third key is present
+  // the destination hash table only.
+  std::vector<TypedValue> common_key;
+  common_key.emplace_back(static_cast<std::int64_t>(0));
+  std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+  exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+  exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+  const std::int64_t common_key_source_sum = 3000;
+  TypedValue common_key_source_sum_val(common_key_source_sum);
+
+  const std::int64_t common_key_destination_sum = 4000;
+  TypedValue common_key_destination_sum_val(common_key_destination_sum);
+
+  const std::int64_t merged_common_key = common_key_source_sum + common_key_destination_sum;
+  TypedValue common_key_merged_val(merged_common_key);
+
+  const std::int64_t exclusive_key_source_sum = 100;
+  TypedValue exclusive_key_source_sum_val(exclusive_key_source_sum);
+
+  const std::int64_t exclusive_key_destination_sum = 200;
+  TypedValue exclusive_key_destination_sum_val(exclusive_key_destination_sum);
+
+  std::unique_ptr<AggregationStateSum> common_key_source_state(
+      static_cast<AggregationStateSum *>(
+          aggregation_handle_sum_->createInitialState()));
+  std::unique_ptr<AggregationStateSum> common_key_destination_state(
+      static_cast<AggregationStateSum *>(
+          aggregation_handle_sum_->createInitialState()));
+  std::unique_ptr<AggregationStateSum> exclusive_key_source_state(
+      static_cast<AggregationStateSum *>(
+          aggregation_handle_sum_->createInitialState()));
+  std::unique_ptr<AggregationStateSum> exclusive_key_destination_state(
+      static_cast<AggregationStateSum *>(
+          aggregation_handle_sum_->createInitialState()));
+
+  // Create sum value states for keys.
+  aggregation_handle_sum_derived->iterateUnaryInl(common_key_source_state.get(),
+                                                  common_key_source_sum_val);
+  std::int64_t actual_val = aggregation_handle_sum_->finalize(*common_key_source_state)
+                       .getLiteral<std::int64_t>();
+  EXPECT_EQ(common_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+  aggregation_handle_sum_derived->iterateUnaryInl(
+      common_key_destination_state.get(), common_key_destination_sum_val);
+  actual_val = aggregation_handle_sum_->finalize(*common_key_destination_state)
+                   .getLiteral<std::int64_t>();
+  EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+  aggregation_handle_sum_derived->iterateUnaryInl(
+      exclusive_key_destination_state.get(), exclusive_key_destination_sum_val);
+  actual_val =
+      aggregation_handle_sum_->finalize(*exclusive_key_destination_state)
+          .getLiteral<std::int64_t>();
+  EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+  aggregation_handle_sum_derived->iterateUnaryInl(
+      exclusive_key_source_state.get(), exclusive_key_source_sum_val);
+  actual_val = aggregation_handle_sum_->finalize(*exclusive_key_source_state)
+                   .getLiteral<std::int64_t>();
+  EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
+
+  // Add the key-state pairs to the hash tables.
+  source_hash_table_derived->putCompositeKey(common_key,
+                                             *common_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      common_key, *common_key_destination_state);
+  source_hash_table_derived->putCompositeKey(exclusive_source_key,
+                                             *exclusive_key_source_state);
+  destination_hash_table_derived->putCompositeKey(
+      exclusive_destination_key, *exclusive_key_destination_state);
+
+  EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+  EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+  aggregation_handle_sum_->mergeGroupByHashTables(*source_hash_table,
+                                                  destination_hash_table.get());
+
+  EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+  CheckSumValue<std::int64_t>(
+      common_key_merged_val.getLiteral<std::int64_t>(),
+      *aggregation_handle_sum_derived,
+      *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+  CheckSumValue<std::int64_t>(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+                     *aggregation_handle_sum_derived,
+                     *(destination_hash_table_derived->getSingleCompositeKey(
+                         exclusive_destination_key)));
+  CheckSumValue<std::int64_t>(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+                     *aggregation_handle_sum_derived,
+                     *(source_hash_table_derived->getSingleCompositeKey(
+                         exclusive_source_key)));
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 9440fae..7d5628d 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -216,7 +216,7 @@ class QueryContext {
    *
    * @param id The BloomFilter id.
    *
-   * @return The constant pointer to BloomFilter that is 
+   * @return The constant pointer to BloomFilter that is
    *         already created in the constructor.
    **/
   inline const BloomFilter* getBloomFilter(const bloom_filter_id id) const {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index d209ceb..4878cf1 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -92,11 +92,12 @@ AggregationOperationState::AggregationOperationState(
     arguments_.push_back({});
     is_distinct_.emplace_back(false);
 
-    group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable(
-        hash_table_impl_type,
-        group_by_types,
-        estimated_num_entries,
-        storage_manager_));
+    group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+        new HashTablePool(estimated_num_entries,
+                          hash_table_impl_type,
+                          group_by_types,
+                          handles_.back().get(),
+                          storage_manager)));
   } else {
     // Set up each individual aggregate in this operation.
     std::vector<const AggregateFunction*>::const_iterator agg_func_it
@@ -124,12 +125,13 @@ AggregationOperationState::AggregationOperationState(
       handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
 
       if (!group_by_list_.empty()) {
-        // Aggregation with GROUP BY: create a HashTable for per-group states.
-        group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable(
-            hash_table_impl_type,
-            group_by_types,
-            estimated_num_entries,
-            storage_manager_));
+        // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+        group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+            new HashTablePool(estimated_num_entries,
+                              hash_table_impl_type,
+                              group_by_types,
+                              handles_.back().get(),
+                              storage_manager)));
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
@@ -408,17 +410,17 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
       // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
       // directly into the (threadsafe) shared global HashTable for this
       // aggregate.
-      //
-      // TODO(shoban): Implement optional code path for using local hash table per
-      // block, which can be merged with global hash table for all blocks
-      // aggregated on.
+      DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
+      AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+      DCHECK(agg_hash_table != nullptr);
       block->aggregateGroupBy(*handles_[agg_idx],
                               arguments_[agg_idx],
                               group_by_list_,
                               predicate_.get(),
-                              group_by_hashtables_[agg_idx].get(),
+                              agg_hash_table,
                               &reuse_matches,
                               &reuse_group_by_vectors);
+      group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table);
     }
   }
 }
@@ -447,19 +449,65 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
 
+  // TODO(harshad) - The merge phase may be slower when each hash table contains
+  // large number of entries. We should find ways in which we can perform a
+  // parallel merge.
+
+  // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+  // e.g. Keep merging entries from smaller hash tables to larger.
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+    if (hash_tables->size() > 1) {
+      for (int hash_table_index = 0;
+           hash_table_index < static_cast<int>(hash_tables->size() - 1);
+           ++hash_table_index) {
+        // Merge each hash table to the last hash table.
+        handles_[agg_idx]->mergeGroupByHashTables(
+            (*(*hash_tables)[hash_table_index]),
+            hash_tables->back().get());
+      }
+    }
+  }
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
     if (is_distinct_[agg_idx]) {
+      DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
+      auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+      DCHECK(hash_tables != nullptr);
+      if (hash_tables->empty()) {
+        // We may have a case where hash_tables is empty, e.g. no input blocks.
+        // However for aggregateOnDistinctifyHashTableForGroupBy to work
+        // correctly, we should create an empty group by hash table.
+        AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+        group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
+        hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+      }
+      DCHECK(hash_tables->back() != nullptr);
+      AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
+      DCHECK(agg_hash_table != nullptr);
       handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
           *distinctify_hashtables_[agg_idx],
-          group_by_hashtables_[agg_idx].get());
+          agg_hash_table);
     }
 
+    auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+    DCHECK(hash_tables != nullptr);
+    if (hash_tables->empty()) {
+      // We may have a case where hash_tables is empty, e.g. no input blocks.
+      // However for aggregateOnDistinctifyHashTableForGroupBy to work
+      // correctly, we should create an empty group by hash table.
+      AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
+      group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
+      hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+    }
+    AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
+    DCHECK(agg_hash_table != nullptr);
     ColumnVector* agg_result_col =
-        handles_[agg_idx]->finalizeHashTable(*group_by_hashtables_[agg_idx],
+        handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
                                              &group_by_keys);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c3a1278..0199749 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -31,6 +31,7 @@
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/HashTableBase.hpp"
+#include "storage/HashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
@@ -209,6 +210,9 @@ class AggregationOperationState {
   // hash table to prevent multiple lookups.
   std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
 
+  // A vector of group by hash table pools, one for each group by clause.
+  std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index a3093df..87a5e54 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -187,6 +187,7 @@ add_library(quickstep_storage_HashTable_proto ${storage_HashTable_proto_srcs})
 add_library(quickstep_storage_HashTableBase ../empty_src.cpp HashTableBase.hpp)
 add_library(quickstep_storage_HashTableFactory HashTableFactory.cpp HashTableFactory.hpp)
 add_library(quickstep_storage_HashTableKeyManager ../empty_src.cpp HashTableKeyManager.hpp)
+add_library(quickstep_storage_HashTablePool ../empty_src.cpp HashTablePool.hpp)
 add_library(quickstep_storage_IndexSubBlock ../empty_src.cpp IndexSubBlock.hpp)
 add_library(quickstep_storage_IndexSubBlockDescriptionFactory ../empty_src.cpp IndexSubBlockDescriptionFactory.hpp)
 add_library(quickstep_storage_InsertDestination InsertDestination.cpp InsertDestination.hpp)
@@ -252,6 +253,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
+                      quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
@@ -662,6 +664,13 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
                       quickstep_types_TypedValue
                       quickstep_types_operations_comparisons_ComparisonUtil
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_HashTablePool
+                      glog
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_HashTableBase
+                      quickstep_threading_SpinMutex
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
 target_link_libraries(quickstep_storage_IndexSubBlock
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_predicate_PredicateCost
@@ -1012,6 +1021,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTableKeyManager
+                      quickstep_storage_HashTablePool
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_IndexSubBlockDescriptionFactory
                       quickstep_storage_InsertDestination

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
new file mode 100644
index 0000000..c16d0f1
--- /dev/null
+++ b/storage/HashTablePool.hpp
@@ -0,0 +1,166 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_
+#define QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_
+
+#include <chrono>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "threading/SpinMutex.hpp"
+#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief A pool of HashTables used for a single aggregation handle. This class
+ *        has similar functionality as InsertDestination, but for checking out
+ *        HashTables. A worker thread can check out a hash table for insertion,
+ *        perform the insertions and return the hash table to the pool. While
+ *        one thread is using a hash table, no other thread can access it.
+ **/
+class HashTablePool {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param agg_handle The aggregation handle.
+   * @param storage_manager A pointer to the storage manager.
+   *
+   * @note The estimate of number of entries is quite inaccurate at this time.
+   *       If we go by the current estimate, each hash table demands much
+   *       larger space than it actually needs, which causes the system to
+   *       either trigger evictions or worse - run out of memory. To fix this
+   *       issue, we divide the estimate by 100. The division will not affect
+   *       correctness, however it may allocate some hash tables smaller space
+   *       than their requirement, causing them to be resized during build
+   *       phase, which has a performance penalty.
+   **/
+  HashTablePool(const std::size_t estimated_num_entries,
+                const HashTableImplType hash_table_impl_type,
+                const std::vector<const Type *> &group_by_types,
+                AggregationHandle *agg_handle,
+                StorageManager *storage_manager)
+      : estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        agg_handle_(DCHECK_NOTNULL(agg_handle)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @return A hash table pointer.
+   **/
+  AggregationStateHashTableBase* getHashTable() {
+    {
+      SpinMutexLock lock(mutex_);
+      if (!hash_tables_.empty()) {
+        std::unique_ptr<AggregationStateHashTableBase> ret_hash_table(
+            std::move(hash_tables_.back()));
+        hash_tables_.pop_back();
+        DCHECK(ret_hash_table != nullptr);
+        return ret_hash_table.release();
+      }
+    }
+    return createNewHashTable();
+  }
+
+  /**
+   * @brief Return a previously checked out hash table.
+   *
+   * @param hash_table A pointer to the checked out hash table.
+   **/
+  void returnHashTable(AggregationStateHashTableBase *hash_table) {
+    SpinMutexLock lock(mutex_);
+    hash_tables_.push_back(
+        std::unique_ptr<AggregationStateHashTableBase>(hash_table));
+  }
+
+  /**
+   * @brief Get all the hash tables from the pool.
+   *
+   * @warning The caller should ensure that this call is made when no hash table
+   *          is being checked in or checked out from the pool. In other words
+   *          the hash table pool is in read-only state.
+   *
+   * @param All the hash tables in the pool.
+   *
+   **/
+  const std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+      getAllHashTables() {
+    return &hash_tables_;
+  }
+
+ private:
+  AggregationStateHashTableBase* createNewHashTable() {
+    return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
+                                               group_by_types_,
+                                               estimated_num_entries_,
+                                               storage_manager_);
+  }
+
+  inline std::size_t reduceEstimatedCardinality(
+      const std::size_t original_estimate) const {
+    if (original_estimate < kEstimateReductionFactor) {
+      return original_estimate;
+    } else {
+      DCHECK_GT(kEstimateReductionFactor, 0u);
+      return original_estimate / kEstimateReductionFactor;
+    }
+  }
+
+  static constexpr std::size_t kEstimateReductionFactor = 100;
+
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
+
+  const std::size_t estimated_num_entries_;
+  const HashTableImplType hash_table_impl_type_;
+
+  const std::vector<const Type *> group_by_types_;
+
+  AggregationHandle *agg_handle_;
+  StorageManager *storage_manager_;
+
+  SpinMutex mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashTablePool);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index dfc95b8..5d91052 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -183,7 +183,8 @@ StorageManager::~StorageManager() {
        it != blocks_.end();
        ++it) {
     if (it->second.block->isDirty()) {
-      LOG(WARNING) << "Block with ID " << BlockIdUtil::ToString(it->first)
+      LOG(WARNING) << (it->second.block->isBlob() ? "Blob " : "Block ")
+                   << "with ID " << BlockIdUtil::ToString(it->first)
                    << " is dirty during StorageManager shutdown";
     }
     delete it->second.block;