You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/05 03:54:00 UTC

[09/11] incubator-quickstep git commit: - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
index 6e6d188..798ba76 100644
--- a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
@@ -32,8 +32,9 @@
 #include "expressions/aggregation/AggregationHandleMin.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/AggregationOperationState.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
 #include "types/DatetimeLit.hpp"
@@ -50,10 +51,7 @@
 #include "types/VarCharType.hpp"
 #include "types/YearMonthIntervalType.hpp"
 #include "types/containers/ColumnVector.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
-#endif
 
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
@@ -222,34 +220,6 @@ class AggregationHandleMinTest : public ::testing::Test {
   }
 
   template <typename GenericType>
-  void checkAggregationMinGenericColumnVector() {
-    const GenericType &type = GenericType::Instance(true);
-    initializeHandle(type);
-    EXPECT_TRUE(
-        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
-            .isNull());
-
-    typename GenericType::cpptype min;
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorGeneric<GenericType>(type, &min));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_min_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckMinValue<typename GenericType::cpptype>(
-        min, *aggregation_handle_min_, *cv_state);
-
-    aggregation_handle_min_->mergeStates(*cv_state,
-                                         aggregation_handle_min_state_.get());
-    CheckMinValue<typename GenericType::cpptype>(
-        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename GenericType>
   void checkAggregationMinGenericValueAccessor() {
     const GenericType &type = GenericType::Instance(true);
     initializeHandle(type);
@@ -265,7 +235,8 @@ class AggregationHandleMinTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_min_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -277,7 +248,6 @@ class AggregationHandleMinTest : public ::testing::Test {
     CheckMinValue<typename GenericType::cpptype>(
         min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   template <typename StringType>
   void checkAggregationMinString() {
@@ -382,33 +352,6 @@ class AggregationHandleMinTest : public ::testing::Test {
   }
 
   template <typename StringType, typename ColumnVectorType>
-  void checkAggregationMinStringColumnVector() {
-    const StringType &type = StringType::Instance(10, true);
-    initializeHandle(type);
-    EXPECT_TRUE(
-        aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
-            .isNull());
-
-    std::string min;
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorString<ColumnVectorType>(type, &min));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_min_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckMinString(min, *aggregation_handle_min_, *cv_state);
-
-    aggregation_handle_min_->mergeStates(*cv_state,
-                                         aggregation_handle_min_state_.get());
-    CheckMinString(
-        min, *aggregation_handle_min_, *aggregation_handle_min_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename StringType, typename ColumnVectorType>
   void checkAggregationMinStringValueAccessor() {
     const StringType &type = StringType::Instance(10, true);
     initializeHandle(type);
@@ -423,7 +366,8 @@ class AggregationHandleMinTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_min_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -434,7 +378,6 @@ class AggregationHandleMinTest : public ::testing::Test {
     CheckMinString(
         min, *aggregation_handle_min_, *aggregation_handle_min_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   std::unique_ptr<AggregationHandle> aggregation_handle_min_;
   std::unique_ptr<AggregationState> aggregation_handle_min_state_;
@@ -511,43 +454,6 @@ TEST_F(AggregationHandleMinTest, VarCharTypeTest) {
   checkAggregationMinString<VarCharType>();
 }
 
-TEST_F(AggregationHandleMinTest, IntTypeColumnVectorTest) {
-  checkAggregationMinGenericColumnVector<IntType>();
-}
-
-TEST_F(AggregationHandleMinTest, LongTypeColumnVectorTest) {
-  checkAggregationMinGenericColumnVector<LongType>();
-}
-
-TEST_F(AggregationHandleMinTest, FloatTypeColumnVectorTest) {
-  checkAggregationMinGenericColumnVector<FloatType>();
-}
-
-TEST_F(AggregationHandleMinTest, DoubleTypeColumnVectorTest) {
-  checkAggregationMinGenericColumnVector<DoubleType>();
-}
-
-TEST_F(AggregationHandleMinTest, DatetimeTypeColumnVectorTest) {
-  checkAggregationMinGenericColumnVector<DatetimeType>();
-}
-
-TEST_F(AggregationHandleMinTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationMinGenericColumnVector<DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleMinTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationMinGenericColumnVector<YearMonthIntervalType>();
-}
-
-TEST_F(AggregationHandleMinTest, CharTypeColumnVectorTest) {
-  checkAggregationMinStringColumnVector<CharType, NativeColumnVector>();
-}
-
-TEST_F(AggregationHandleMinTest, VarCharTypeColumnVectorTest) {
-  checkAggregationMinStringColumnVector<VarCharType, IndirectColumnVector>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 TEST_F(AggregationHandleMinTest, IntTypeValueAccessorTest) {
   checkAggregationMinGenericValueAccessor<IntType>();
 }
@@ -583,7 +489,6 @@ TEST_F(AggregationHandleMinTest, CharTypeValueAccessorTest) {
 TEST_F(AggregationHandleMinTest, VarCharTypeValueAccessorTest) {
   checkAggregationMinStringValueAccessor<VarCharType, IndirectColumnVector>();
 }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
 #ifdef QUICKSTEP_DEBUG
 TEST_F(AggregationHandleMinDeathTest, WrongTypeTest) {
@@ -685,28 +590,25 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
   initializeHandle(int_non_null_type);
   storage_manager_.reset(new StorageManager("./test_min_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &int_non_null_type),
           10,
-          {aggregation_handle_min_.get()->getPayloadSize()},
           {aggregation_handle_min_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &int_non_null_type),
           10,
-          {aggregation_handle_min_.get()->getPayloadSize()},
           {aggregation_handle_min_.get()},
           storage_manager_.get()));
 
-  AggregationStateFastHashTable *destination_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(
-          destination_hash_table.get());
+  PackedPayloadHashTable *destination_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(destination_hash_table.get());
 
-  AggregationStateFastHashTable *source_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
+  PackedPayloadHashTable *source_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(source_hash_table.get());
 
   AggregationHandleMin *aggregation_handle_min_derived =
       static_cast<AggregationHandleMin *>(aggregation_handle_min_.get());
@@ -776,47 +678,47 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
   memcpy(buffer + 1,
          common_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_min_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(common_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          common_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_min_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+  destination_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_min_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_min_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
-                                                      buffer);
+  destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key,
+                                                     buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  AggregationOperationState::mergeGroupByHashTables(
-      source_hash_table.get(), destination_hash_table.get());
+  HashTableMerger merger(destination_hash_table_derived);
+  source_hash_table_derived->forEachCompositeKey(&merger);
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckMinValue<int>(
       common_key_source_min_val.getLiteral<int>(),
-      aggregation_handle_min_derived->finalizeHashTableEntryFast(
+      aggregation_handle_min_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(common_key) +
           1));
-  CheckMinValue<int>(exclusive_key_destination_min_val.getLiteral<int>(),
-                     aggregation_handle_min_derived->finalizeHashTableEntryFast(
-                         destination_hash_table_derived->getSingleCompositeKey(
-                             exclusive_destination_key) +
-                         1));
-  CheckMinValue<int>(exclusive_key_source_min_val.getLiteral<int>(),
-                     aggregation_handle_min_derived->finalizeHashTableEntryFast(
-                         source_hash_table_derived->getSingleCompositeKey(
-                             exclusive_source_key) +
-                         1));
+  CheckMinValue<int>(
+      exclusive_key_destination_min_val.getLiteral<int>(),
+      aggregation_handle_min_derived->finalizeHashTableEntry(
+          destination_hash_table_derived->getSingleCompositeKey(
+              exclusive_destination_key) + 1));
+  CheckMinValue<int>(
+      exclusive_key_source_min_val.getLiteral<int>(),
+      aggregation_handle_min_derived->finalizeHashTableEntry(
+          source_hash_table_derived->getSingleCompositeKey(
+              exclusive_source_key) + 1));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
index 1d1c084..31a35a0 100644
--- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
@@ -29,8 +29,9 @@
 #include "expressions/aggregation/AggregationHandleSum.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/AggregationOperationState.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/CharType.hpp"
 #include "types/DatetimeIntervalType.hpp"
 #include "types/DoubleType.hpp"
@@ -45,10 +46,7 @@
 #include "types/VarCharType.hpp"
 #include "types/YearMonthIntervalType.hpp"
 #include "types/containers/ColumnVector.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
-#endif
 
 #include "gtest/gtest.h"
 
@@ -186,36 +184,6 @@ class AggregationHandleSumTest : public ::testing::Test {
   }
 
   template <typename GenericType, typename PrecisionType>
-  void checkAggregationSumGenericColumnVector() {
-    const GenericType &type = GenericType::Instance(true);
-
-    initializeHandle(type);
-    EXPECT_TRUE(
-        aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
-            .isNull());
-
-    typename PrecisionType::cpptype sum;
-    std::vector<std::unique_ptr<ColumnVector>> column_vectors;
-    column_vectors.emplace_back(
-        createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(
-            type, &sum));
-
-    std::unique_ptr<AggregationState> cv_state(
-        aggregation_handle_sum_->accumulateColumnVectors(column_vectors));
-
-    // Test the state generated directly by accumulateColumnVectors(), and also
-    // test after merging back.
-    CheckSumValue<typename PrecisionType::cpptype>(
-        sum, *aggregation_handle_sum_, *cv_state);
-
-    aggregation_handle_sum_->mergeStates(*cv_state,
-                                         aggregation_handle_sum_state_.get());
-    CheckSumValue<typename PrecisionType::cpptype>(
-        sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
-  }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  template <typename GenericType, typename PrecisionType>
   void checkAggregationSumGenericValueAccessor() {
     const GenericType &type = GenericType::Instance(true);
 
@@ -233,7 +201,8 @@ class AggregationHandleSumTest : public ::testing::Test {
 
     std::unique_ptr<AggregationState> va_state(
         aggregation_handle_sum_->accumulateValueAccessor(
-            accessor.get(), std::vector<attribute_id>(1, 0)));
+            {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+            ValueAccessorMultiplexer(accessor.get())));
 
     // Test the state generated directly by accumulateValueAccessor(), and also
     // test after merging back.
@@ -245,7 +214,6 @@ class AggregationHandleSumTest : public ::testing::Test {
     CheckSumValue<typename PrecisionType::cpptype>(
         sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
   }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
   std::unique_ptr<AggregationHandle> aggregation_handle_sum_;
   std::unique_ptr<AggregationState> aggregation_handle_sum_state_;
@@ -306,33 +274,6 @@ TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeTest) {
   checkAggregationSumGeneric<YearMonthIntervalType, YearMonthIntervalType>();
 }
 
-TEST_F(AggregationHandleSumTest, IntTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<IntType, LongType>();
-}
-
-TEST_F(AggregationHandleSumTest, LongTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<LongType, LongType>();
-}
-
-TEST_F(AggregationHandleSumTest, FloatTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<FloatType, DoubleType>();
-}
-
-TEST_F(AggregationHandleSumTest, DoubleTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<DoubleType, DoubleType>();
-}
-
-TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<DatetimeIntervalType,
-                                         DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeColumnVectorTest) {
-  checkAggregationSumGenericColumnVector<YearMonthIntervalType,
-                                         YearMonthIntervalType>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 TEST_F(AggregationHandleSumTest, IntTypeValueAccessorTest) {
   checkAggregationSumGenericValueAccessor<IntType, LongType>();
 }
@@ -358,7 +299,6 @@ TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeValueAccessorTest) {
   checkAggregationSumGenericValueAccessor<YearMonthIntervalType,
                                           YearMonthIntervalType>();
 }
-#endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
 #ifdef QUICKSTEP_DEBUG
 TEST_F(AggregationHandleSumDeathTest, CharTypeTest) {
@@ -464,28 +404,25 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   initializeHandle(long_non_null_type);
   storage_manager_.reset(new StorageManager("./test_sum_data"));
   std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
-          {aggregation_handle_sum_.get()->getPayloadSize()},
           {aggregation_handle_sum_.get()},
           storage_manager_.get()));
   std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
-      AggregationStateFastHashTableFactory::CreateResizable(
+      AggregationStateHashTableFactory::CreateResizable(
           HashTableImplType::kSeparateChaining,
           std::vector<const Type *>(1, &long_non_null_type),
           10,
-          {aggregation_handle_sum_.get()->getPayloadSize()},
           {aggregation_handle_sum_.get()},
           storage_manager_.get()));
 
-  AggregationStateFastHashTable *destination_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(
-          destination_hash_table.get());
+  PackedPayloadHashTable *destination_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(destination_hash_table.get());
 
-  AggregationStateFastHashTable *source_hash_table_derived =
-      static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
+  PackedPayloadHashTable *source_hash_table_derived =
+      static_cast<PackedPayloadHashTable *>(source_hash_table.get());
 
   AggregationHandleSum *aggregation_handle_sum_derived =
       static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get());
@@ -563,49 +500,47 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
   memcpy(buffer + 1,
          common_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_sum_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(common_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          common_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_sum_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(common_key, buffer);
+  destination_hash_table_derived->upsertCompositeKey(common_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_source_state.get()->getPayloadAddress(),
          aggregation_handle_sum_.get()->getPayloadSize());
-  source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+  source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer);
 
   memcpy(buffer + 1,
          exclusive_key_destination_state.get()->getPayloadAddress(),
          aggregation_handle_sum_.get()->getPayloadSize());
-  destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
-                                                      buffer);
+  destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key,
+                                                     buffer);
 
   EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
   EXPECT_EQ(2u, source_hash_table_derived->numEntries());
 
-  AggregationOperationState::mergeGroupByHashTables(
-      source_hash_table.get(), destination_hash_table.get());
+  HashTableMerger merger(destination_hash_table_derived);
+  source_hash_table_derived->forEachCompositeKey(&merger);
 
   EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
 
   CheckSumValue<std::int64_t>(
       common_key_merged_val.getLiteral<std::int64_t>(),
-      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+      aggregation_handle_sum_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(common_key) +
           1));
   CheckSumValue<std::int64_t>(
       exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
-      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+      aggregation_handle_sum_derived->finalizeHashTableEntry(
           destination_hash_table_derived->getSingleCompositeKey(
-              exclusive_destination_key) +
-          1));
+              exclusive_destination_key) + 1));
   CheckSumValue<std::int64_t>(
       exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
-      aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+      aggregation_handle_sum_derived->finalizeHashTableEntry(
           source_hash_table_derived->getSingleCompositeKey(
-              exclusive_source_key) +
-          1));
+              exclusive_source_key) + 1));
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 895c2ea..ed0f99c 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -200,20 +200,6 @@ class QueryContext {
   }
 
   /**
-   * @brief Destroy the payloads from the aggregation hash tables.
-   *
-   * @warning After calling these methods, the hash table will be in an invalid
-   *          state. No other operation should be performed on them.
-   *
-   * @param id The ID of the AggregationOperationState.
-   **/
-  inline void destroyAggregationHashTablePayload(const aggregation_state_id id) {
-    DCHECK_LT(id, aggregation_states_.size());
-    DCHECK(aggregation_states_[id]);
-    aggregation_states_[id]->destroyAggregationHashTablePayload();
-  }
-
-  /**
    * @brief Whether the given GeneratorFunctionHandle id is valid.
    *
    * @param id The GeneratorFunctionHandle id.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7f90e11..a755832 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,6 +64,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunction_proto
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
@@ -125,6 +126,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
+                      quickstep_relationaloperators_InitializeAggregationOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_RelationalOperator
@@ -145,6 +147,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_types_Type
+                      quickstep_types_TypeID
                       quickstep_types_Type_proto
                       quickstep_types_TypedValue
                       quickstep_types_TypedValue_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6918313..67c8739 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,6 +49,7 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
@@ -105,6 +106,7 @@
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 #include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationOperator.hpp"
 #include "relational_operators/InsertOperator.hpp"
 #include "relational_operators/NestedLoopsJoinOperator.hpp"
 #include "relational_operators/RelationalOperator.hpp"
@@ -126,6 +128,7 @@
 #include "storage/SubBlockTypeRegistry.hpp"
 #include "types/Type.hpp"
 #include "types/Type.pb.h"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/TypedValue.pb.h"
 #include "types/containers/Tuple.pb.h"
@@ -371,6 +374,110 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
   }
 }
 
+bool ExecutionGenerator::canUseCollisionFreeAggregation(
+    const P::AggregatePtr &aggregate,
+    const std::size_t estimated_num_groups,
+    std::size_t *max_num_groups) const {
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Currently we cannot do this fast path with the distributed setting. See
+  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+  // FinalizeAggregationOperator::getAllWorkOrderProtos().
+  return false;
+#endif
+
+  // Supports only single group-by key.
+  if (aggregate->grouping_expressions().size() != 1) {
+    return false;
+  }
+
+  // We need to know the exact min/max stats of the group-by key.
+  // So it must be a CatalogAttribute (but not an expression).
+  E::AttributeReferencePtr group_by_key_attr;
+  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+    return false;
+  }
+
+  bool min_value_stat_is_exact;
+  bool max_value_stat_is_exact;
+  const TypedValue min_value =
+      cost_model_for_aggregation_->findMinValueStat(
+          aggregate, group_by_key_attr, &min_value_stat_is_exact);
+  const TypedValue max_value =
+      cost_model_for_aggregation_->findMaxValueStat(
+          aggregate, group_by_key_attr, &max_value_stat_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+    return false;
+  }
+
+  std::int64_t min_cpp_value;
+  std::int64_t max_cpp_value;
+  switch (group_by_key_attr->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      min_cpp_value = min_value.getLiteral<int>();
+      max_cpp_value = max_value.getLiteral<int>();
+      break;
+    }
+    case TypeID::kLong: {
+      min_cpp_value = min_value.getLiteral<std::int64_t>();
+      max_cpp_value = max_value.getLiteral<std::int64_t>();
+      break;
+    }
+    default:
+      return false;
+  }
+
+  // TODO(jianqiao):
+  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+  // 2. Reason about the upbound (e.g. by checking memory size) instead of
+  //    hardcoding it here.
+  const std::int64_t kGroupSizeUpbound = 1000000000;
+  if (min_cpp_value < 0 ||
+      max_cpp_value > kGroupSizeUpbound ||
+      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+    return false;
+  }
+
+  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_func =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+    if (agg_func->is_distinct()) {
+      return false;
+    }
+
+    // TODO(jianqiao): Support AggregationID::AVG.
+    switch (agg_func->getAggregate().getAggregationID()) {
+      case AggregationID::kCount:  // Fall through
+      case AggregationID::kSum:
+        break;
+      default:
+        return false;
+    }
+
+    const auto &arguments = agg_func->getArguments();
+    if (arguments.size() > 1) {
+      return false;
+    }
+
+    if (arguments.size() == 1) {
+      switch (arguments.front()->getValueType().getTypeID()) {
+        case TypeID::kInt:  // Fall through
+        case TypeID::kLong:
+        case TypeID::kFloat:
+        case TypeID::kDouble:
+          break;
+        default:
+          return false;
+      }
+    }
+  }
+
+  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+  return true;
+}
+
 void ExecutionGenerator::convertNamedExpressions(
     const std::vector<E::NamedExpressionPtr> &named_expressions,
     S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1475,6 +1582,8 @@ void ExecutionGenerator::convertAggregate(
       findRelationInfoOutputByPhysical(physical_plan->input());
   aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
 
+  bool use_parallel_initialization = false;
+
   std::vector<const Type*> group_by_types;
   for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
     unique_ptr<const Scalar> execution_group_by_expression;
@@ -1495,9 +1604,28 @@ void ExecutionGenerator::convertAggregate(
   }
 
   if (!group_by_types.empty()) {
-    // Right now, only SeparateChaining is supported.
-    aggr_state_proto->set_hash_table_impl_type(
-        serialization::HashTableImplType::SEPARATE_CHAINING);
+    const std::size_t estimated_num_groups =
+        cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
+
+    std::size_t max_num_groups;
+    const bool can_use_collision_free_aggregation =
+        canUseCollisionFreeAggregation(physical_plan,
+                                       estimated_num_groups,
+                                       &max_num_groups);
+
+    if (can_use_collision_free_aggregation) {
+      aggr_state_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+      aggr_state_proto->set_estimated_num_entries(max_num_groups);
+      use_parallel_initialization = true;
+    } else {
+      // Otherwise, use SeparateChaining.
+      aggr_state_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
+      aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
+    }
+  } else {
+    aggr_state_proto->set_estimated_num_entries(1uL);
   }
 
   for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1535,10 +1663,6 @@ void ExecutionGenerator::convertAggregate(
     aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
   }
 
-  const std::size_t estimated_num_groups =
-      cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
-  aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
-
   const QueryPlan::DAGNodeIndex aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new AggregationOperator(
@@ -1553,6 +1677,18 @@ void ExecutionGenerator::convertAggregate(
                                          false /* is_pipeline_breaker */);
   }
 
+  if (use_parallel_initialization) {
+    const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+        execution_plan_->addRelationalOperator(
+            new InitializeAggregationOperator(
+                query_handle_->query_id(),
+                aggr_state_index));
+
+    execution_plan_->addDirectDependency(aggregation_operator_index,
+                                         initialize_aggregation_operator_index,
+                                         true /* is_pipeline_breaker */);
+  }
+
   // Create InsertDestination proto.
   const CatalogRelation *output_relation = nullptr;
   const QueryContext::insert_destination_id insert_destination_index =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index eba6eee..987f11a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -37,6 +38,7 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
@@ -204,6 +206,22 @@ class ExecutionGenerator {
   std::string getNewRelationName();
 
   /**
+   * @brief Checks whether an aggregate node can be efficiently evaluated with
+   *        the collision-free aggregation fast path.
+   *
+   * @param aggregate The physical aggregate node to be checked.
+   * @param estimated_num_groups The estimated number of groups for the aggregate.
+   * @param exact_num_groups If collision-free aggregation is applicable, the
+   *        pointed content of this pointer will be set as the maximum possible
+   *        number of groups that the collision-free hash table need to hold.
+   * @return A bool value indicating whether collision-free aggregation can be
+   *         used to evaluate \p aggregate.
+   */
+  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+                                      const std::size_t estimated_num_groups,
+                                      std::size_t *max_num_groups) const;
+
+  /**
    * @brief Sets up the info of the CatalogRelation represented by TableReference.
    *        TableReference is not converted to any operator.
    *
@@ -427,7 +445,7 @@ class ExecutionGenerator {
   /**
    * @brief The cost model to use for estimating aggregation hash table size.
    */
-  std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_;
 
   /**
    * @brief The cost model to use for estimating join hash table size.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c18dc77..df4114d 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -47,6 +47,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator
             FinalizeAggregationOperator.cpp
             FinalizeAggregationOperator.hpp)
 add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp)
+add_library(quickstep_relationaloperators_InitializeAggregationOperator
+            InitializeAggregationOperator.cpp
+            InitializeAggregationOperator.hpp)
 add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp)
 add_library(quickstep_relationaloperators_NestedLoopsJoinOperator
             NestedLoopsJoinOperator.cpp
@@ -254,6 +257,17 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_InitializeAggregationOperator
+                      glog
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_AggregationOperationState
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_InsertOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -548,6 +562,7 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
+                      quickstep_relationaloperators_InitializeAggregationOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_RebuildWorkOrder

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 49be43d..62ca9e7 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
 }
 
 void DestroyAggregationStateWorkOrder::execute() {
-  // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate
-  // from the destroyAggregationState call. The reason is that the aggregation
-  // hash tables don't own the AggregationHandle objects. However the hash table
-  // class requires the handles for destroying the payload (see the
-  // destroyPayload methods in AggregationHandle classes). Therefore, we first
-  // destroy the payloads in the hash table and then destroy the hash table.
-  query_context_->destroyAggregationHashTablePayload(aggr_state_index_);
   query_context_->destroyAggregationState(aggr_state_index_);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 0cbf635..77b4b97 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,15 +44,15 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
     AggregationOperationState *agg_state =
         query_context->getAggregationState(aggr_state_index_);
     DCHECK(agg_state != nullptr);
-    for (int part_id = 0;
-         part_id < static_cast<int>(agg_state->getNumPartitions());
+    for (std::size_t part_id = 0;
+         part_id < agg_state->getNumFinalizationPartitions();
          ++part_id) {
       container->addNormalWorkOrder(
           new FinalizeAggregationWorkOrder(
               query_id_,
+              part_id,
               agg_state,
-              query_context->getInsertDestination(output_destination_index_),
-              part_id),
+              query_context->getInsertDestination(output_destination_index_)),
           op_index_);
     }
   }
@@ -61,7 +61,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 
 // TODO(quickstep-team) : Think about how the number of partitions could be
 // accessed in this function. Until then, we can't use partitioned aggregation
-// with the distributed version.
+// finalization with the distributed version.
 bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
@@ -80,11 +80,7 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 }
 
 void FinalizeAggregationWorkOrder::execute() {
-  if (state_->isAggregatePartitioned()) {
-    state_->finalizeAggregatePartitioned(part_id_, output_destination_);
-  } else {
-    state_->finalizeAggregate(output_destination_);
-  }
+  state_->finalizeAggregate(partition_id_, output_destination_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index ae7127a..3c209b1 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @note InsertWorkOrder takes ownership of \c state.
    *
    * @param query_id The ID of the query to which this operator belongs.
+   * @param partition_id The partition ID for which the Finalize aggregation
+   *        work order is issued.
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
-   * @param part_id The partition ID for which the Finalize aggregation work
-   *        order is issued. Ignore if aggregation is not partitioned.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
+                               const std::size_t partition_id,
                                AggregationOperationState *state,
-                               InsertDestination *output_destination,
-                               const int part_id = -1)
+                               InsertDestination *output_destination)
       : WorkOrder(query_id),
+        partition_id_(partition_id),
         state_(DCHECK_NOTNULL(state)),
-        output_destination_(DCHECK_NOTNULL(output_destination)),
-        part_id_(part_id) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)) {}
 
   ~FinalizeAggregationWorkOrder() override {}
 
   void execute() override;
 
  private:
+  const std::size_t partition_id_;
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
-  const int part_id_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
new file mode 100644
index 0000000..b1063ad
--- /dev/null
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/InitializeAggregationOperator.hpp"
+
+#include <cstddef>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool InitializeAggregationOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (!started_) {
+    AggregationOperationState *agg_state =
+        query_context->getAggregationState(aggr_state_index_);
+    DCHECK(agg_state != nullptr);
+
+    for (std::size_t part_id = 0;
+         part_id < agg_state->getNumInitializationPartitions();
+         ++part_id) {
+      container->addNormalWorkOrder(
+          new InitializeAggregationWorkOrder(query_id_,
+                                             part_id,
+                                             agg_state),
+          op_index_);
+    }
+    started_ = true;
+  }
+  return true;
+}
+
+// TODO(quickstep-team) : Think about how the number of partitions could be
+// accessed in this function. Until then, we can't use partitioned aggregation
+// initialization with the distributed version.
+bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  LOG(FATAL) << "Not supported";
+}
+
+void InitializeAggregationWorkOrder::execute() {
+  state_->initialize(partition_id_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/InitializeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
new file mode 100644
index 0000000..58d848b
--- /dev/null
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_
+
+#include <string>
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief An operator which initializes an AggregationOperationState.
+ **/
+class InitializeAggregationOperator : public RelationalOperator {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of this query.
+   * @param aggr_state_index The index of the AggregationOperationState in QueryContext.
+   **/
+  InitializeAggregationOperator(const std::size_t query_id,
+                                const QueryContext::aggregation_state_id aggr_state_index)
+      : RelationalOperator(query_id),
+        aggr_state_index_(aggr_state_index),
+        started_(false) {}
+
+  ~InitializeAggregationOperator() override {}
+
+  std::string getName() const override {
+    return "InitializeAggregationOperator";
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+  const QueryContext::aggregation_state_id aggr_state_index_;
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by InitializeAggregationOperator.
+ **/
+class InitializeAggregationWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of the query to which this operator belongs.
+   * @param partition_id The partition ID for which the work order is issued.
+   * @param state The AggregationOperationState to be initialized.
+   */
+  InitializeAggregationWorkOrder(const std::size_t query_id,
+                                 const std::size_t partition_id,
+                                 AggregationOperationState *state)
+      : WorkOrder(query_id),
+        partition_id_(partition_id),
+        state_(DCHECK_NOTNULL(state)) {}
+
+  ~InitializeAggregationWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const std::size_t partition_id_;
+
+  AggregationOperationState *state_;
+
+  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 5e8d03d..306bd1a 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -186,6 +186,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       LOG(INFO) << "Creating FinalizeAggregationWorkOrder in Shiftboss " << shiftboss_index;
       return new FinalizeAggregationWorkOrder(
           proto.query_id(),
+          0uL,
           query_context->getAggregationState(proto.GetExtension(
               serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
           query_context->getInsertDestination(