You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/20 20:30:32 UTC

[4/8] incubator-quickstep git commit: Initial commit for QUICKSTEP-28 and QUICKSTEP-29.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 3f6e23a..073b813 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -59,7 +59,7 @@ namespace quickstep {
 
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
-    const std::vector<const AggregateFunction*> &aggregate_functions,
+    const std::vector<const AggregateFunction *> &aggregate_functions,
     std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
     std::vector<bool> &&is_distinct,
     std::vector<std::unique_ptr<const Scalar>> &&group_by,
@@ -78,11 +78,14 @@ AggregationOperationState::AggregationOperationState(
   DCHECK(aggregate_functions.size() == arguments_.size());
 
   // Get the types of GROUP BY expressions for creating HashTables below.
-  std::vector<const Type*> group_by_types;
+  std::vector<const Type *> group_by_types;
   for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
     group_by_types.emplace_back(&group_by_element->getType());
   }
 
+  std::vector<AggregationHandle *> group_by_handles;
+  group_by_handles.clear();
+
   if (aggregate_functions.size() == 0) {
     // If there is no aggregation function, then it is a distinctify operation
     // on the group-by expressions.
@@ -91,26 +94,28 @@ AggregationOperationState::AggregationOperationState(
     handles_.emplace_back(new AggregationHandleDistinct());
     arguments_.push_back({});
     is_distinct_.emplace_back(false);
-
-    group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
-        new HashTablePool(estimated_num_entries,
-                          hash_table_impl_type,
-                          group_by_types,
-                          handles_.back().get(),
-                          storage_manager)));
+    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                     hash_table_impl_type,
+                                                     group_by_types,
+                                                     {1},
+                                                     handles_,
+                                                     storage_manager));
   } else {
     // Set up each individual aggregate in this operation.
-    std::vector<const AggregateFunction*>::const_iterator agg_func_it
-        = aggregate_functions.begin();
-    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator args_it
-        = arguments_.begin();
+    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
+        aggregate_functions.begin();
+    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
+        args_it = arguments_.begin();
     std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
-    std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it
-        = distinctify_hash_table_impl_types.begin();
-    for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) {
+    std::vector<HashTableImplType>::const_iterator
+        distinctify_hash_table_impl_types_it =
+            distinctify_hash_table_impl_types.begin();
+    std::vector<std::size_t> payload_sizes;
+    for (; agg_func_it != aggregate_functions.end();
+         ++agg_func_it, ++args_it, ++is_distinct_it) {
       // Get the Types of this aggregate's arguments so that we can create an
       // AggregationHandle.
-      std::vector<const Type*> argument_types;
+      std::vector<const Type *> argument_types;
       for (const std::unique_ptr<const Scalar> &argument : *args_it) {
         argument_types.emplace_back(&argument->getType());
       }
@@ -125,13 +130,13 @@ AggregationOperationState::AggregationOperationState(
       handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
 
       if (!group_by_list_.empty()) {
-        // Aggregation with GROUP BY: create a HashTable pool for per-group states.
-        group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
-            new HashTablePool(estimated_num_entries,
-                              hash_table_impl_type,
-                              group_by_types,
-                              handles_.back().get(),
-                              storage_manager)));
+        // Aggregation with GROUP BY: combined payload is partially updated in
+        // the presence of DISTINCT.
+        if (*is_distinct_it) {
+          handles_.back()->blockUpdate();
+        }
+        group_by_handles.emplace_back(handles_.back());
+        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
@@ -143,40 +148,60 @@ AggregationOperationState::AggregationOperationState(
         std::vector<attribute_id> local_arguments_as_attributes;
         local_arguments_as_attributes.reserve(args_it->size());
         for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-          const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
+          const attribute_id argument_id =
+              argument->getAttributeIdForValueAccessor();
           if (argument_id == -1) {
             local_arguments_as_attributes.clear();
             break;
           } else {
-            DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+            DCHECK_EQ(input_relation_.getID(),
+                      argument->getRelationIdForValueAccessor());
             local_arguments_as_attributes.push_back(argument_id);
           }
         }
 
-        arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes));
+        arguments_as_attributes_.emplace_back(
+            std::move(local_arguments_as_attributes));
 #endif
       }
 
-      // Initialize the corresponding distinctify hash table if this is a DISTINCT
+      // Initialize the corresponding distinctify hash table if this is a
+      // DISTINCT
       // aggregation.
       if (*is_distinct_it) {
-        std::vector<const Type*> key_types(group_by_types);
-        key_types.insert(key_types.end(), argument_types.begin(), argument_types.end());
-        // TODO(jianqiao): estimated_num_entries is quite inaccurate for estimating
+        std::vector<const Type *> key_types(group_by_types);
+        key_types.insert(
+            key_types.end(), argument_types.begin(), argument_types.end());
+        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
+        // estimating
         // the number of entries in the distinctify hash table. We may estimate
-        // for each distinct aggregation an estimated_num_distinct_keys value during
+        // for each distinct aggregation an estimated_num_distinct_keys value
+        // during
         // query optimization, if it worths.
         distinctify_hashtables_.emplace_back(
-            handles_.back()->createDistinctifyHashTable(
+            AggregationStateFastHashTableFactory::CreateResizable(
                 *distinctify_hash_table_impl_types_it,
                 key_types,
                 estimated_num_entries,
+                {0},
+                {},
                 storage_manager));
         ++distinctify_hash_table_impl_types_it;
       } else {
         distinctify_hashtables_.emplace_back(nullptr);
       }
     }
+
+    if (!group_by_handles.empty()) {
+      // Aggregation with GROUP BY: create a HashTable pool for per-group
+      // states.
+      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                       hash_table_impl_type,
+                                                       group_by_types,
+                                                       payload_sizes,
+                                                       group_by_handles,
+                                                       storage_manager));
+    }
   }
 }
 
@@ -187,7 +212,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
   DCHECK(ProtoIsValid(proto, database));
 
   // Rebuild contructor arguments from their representation in 'proto'.
-  std::vector<const AggregateFunction*> aggregate_functions;
+  std::vector<const AggregateFunction *> aggregate_functions;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments;
   std::vector<bool> is_distinct;
   std::vector<HashTableImplType> distinctify_hash_table_impl_types;
@@ -200,62 +225,63 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
 
     arguments.emplace_back();
     arguments.back().reserve(agg_proto.argument_size());
-    for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); ++argument_idx) {
+    for (int argument_idx = 0; argument_idx < agg_proto.argument_size();
+         ++argument_idx) {
       arguments.back().emplace_back(ScalarFactory::ReconstructFromProto(
-          agg_proto.argument(argument_idx),
-          database));
+          agg_proto.argument(argument_idx), database));
     }
 
     is_distinct.emplace_back(agg_proto.is_distinct());
 
     if (agg_proto.is_distinct()) {
       distinctify_hash_table_impl_types.emplace_back(
-          HashTableImplTypeFromProto(
-              proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index)));
+          HashTableImplTypeFromProto(proto.distinctify_hash_table_impl_types(
+              distinctify_hash_table_impl_type_index)));
       ++distinctify_hash_table_impl_type_index;
     }
   }
 
   std::vector<std::unique_ptr<const Scalar>> group_by_expressions;
-  for (int group_by_idx = 0;
-       group_by_idx < proto.group_by_expressions_size();
+  for (int group_by_idx = 0; group_by_idx < proto.group_by_expressions_size();
        ++group_by_idx) {
     group_by_expressions.emplace_back(ScalarFactory::ReconstructFromProto(
-        proto.group_by_expressions(group_by_idx),
-        database));
+        proto.group_by_expressions(group_by_idx), database));
   }
 
   unique_ptr<Predicate> predicate;
   if (proto.has_predicate()) {
     predicate.reset(
-        PredicateFactory::ReconstructFromProto(proto.predicate(),
-                                               database));
+        PredicateFactory::ReconstructFromProto(proto.predicate(), database));
   }
 
-  return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
-                                       aggregate_functions,
-                                       std::move(arguments),
-                                       std::move(is_distinct),
-                                       std::move(group_by_expressions),
-                                       predicate.release(),
-                                       proto.estimated_num_entries(),
-                                       HashTableImplTypeFromProto(proto.hash_table_impl_type()),
-                                       distinctify_hash_table_impl_types,
-                                       storage_manager);
+  return new AggregationOperationState(
+      database.getRelationSchemaById(proto.relation_id()),
+      aggregate_functions,
+      std::move(arguments),
+      std::move(is_distinct),
+      std::move(group_by_expressions),
+      predicate.release(),
+      proto.estimated_num_entries(),
+      HashTableImplTypeFromProto(proto.hash_table_impl_type()),
+      distinctify_hash_table_impl_types,
+      storage_manager);
 }
 
-bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOperationState &proto,
-                                             const CatalogDatabaseLite &database) {
+bool AggregationOperationState::ProtoIsValid(
+    const serialization::AggregationOperationState &proto,
+    const CatalogDatabaseLite &database) {
   if (!proto.IsInitialized() ||
       !database.hasRelationWithId(proto.relation_id()) ||
       (proto.aggregates_size() < 0)) {
     return false;
   }
 
-  std::size_t num_distinctify_hash_tables = proto.distinctify_hash_table_impl_types_size();
+  std::size_t num_distinctify_hash_tables =
+      proto.distinctify_hash_table_impl_types_size();
   std::size_t distinctify_hash_table_impl_type_index = 0;
   for (int i = 0; i < proto.aggregates_size(); ++i) {
-    if (!AggregateFunctionFactory::ProtoIsValid(proto.aggregates(i).function())) {
+    if (!AggregateFunctionFactory::ProtoIsValid(
+            proto.aggregates(i).function())) {
       return false;
     }
 
@@ -266,16 +292,18 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe
     for (int argument_idx = 0;
          argument_idx < proto.aggregates(i).argument_size();
          ++argument_idx) {
-      if (!ScalarFactory::ProtoIsValid(proto.aggregates(i).argument(argument_idx),
-                                       database)) {
+      if (!ScalarFactory::ProtoIsValid(
+              proto.aggregates(i).argument(argument_idx), database)) {
         return false;
       }
     }
 
     if (proto.aggregates(i).is_distinct()) {
-      if (distinctify_hash_table_impl_type_index >= num_distinctify_hash_tables ||
+      if (distinctify_hash_table_impl_type_index >=
+              num_distinctify_hash_tables ||
           !serialization::HashTableImplType_IsValid(
-              proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index))) {
+              proto.distinctify_hash_table_impl_types(
+                  distinctify_hash_table_impl_type_index))) {
         return false;
       }
     }
@@ -288,8 +316,9 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe
   }
 
   if (proto.group_by_expressions_size() > 0) {
-    if (!proto.has_hash_table_impl_type()
-        || !serialization::HashTableImplType_IsValid(proto.hash_table_impl_type())) {
+    if (!proto.has_hash_table_impl_type() ||
+        !serialization::HashTableImplType_IsValid(
+            proto.hash_table_impl_type())) {
       return false;
     }
   }
@@ -311,7 +340,8 @@ void AggregationOperationState::aggregateBlock(const block_id input_block) {
   }
 }
 
-void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) {
+void AggregationOperationState::finalizeAggregate(
+    InsertDestination *output_destination) {
   if (group_by_list_.empty()) {
     finalizeSingleState(output_destination);
   } else {
@@ -330,19 +360,19 @@ void AggregationOperationState::mergeSingleState(
   }
 }
 
-void AggregationOperationState::aggregateBlockSingleState(const block_id input_block) {
+void AggregationOperationState::aggregateBlockSingleState(
+    const block_id input_block) {
   // Aggregate per-block state for each aggregate.
   std::vector<std::unique_ptr<AggregationState>> local_state;
 
-  BlockReference block(storage_manager_->getBlock(input_block, input_relation_));
+  BlockReference block(
+      storage_manager_->getBlock(input_block, input_relation_));
 
   // If there is a filter predicate, 'reuse_matches' holds the set of matching
   // tuples so that it can be reused across multiple aggregates (i.e. we only
   // pay the cost of evaluating the predicate once).
   std::unique_ptr<TupleIdSequence> reuse_matches;
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
     // If all arguments are attributes of the input relation, elide a copy.
@@ -365,12 +395,11 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
       local_state.emplace_back(nullptr);
     } else {
       // Call StorageBlock::aggregate() to actually do the aggregation.
-      local_state.emplace_back(
-          block->aggregate(*handles_[agg_idx],
-                           arguments_[agg_idx],
-                           local_arguments_as_attributes,
-                           predicate_.get(),
-                           &reuse_matches));
+      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
+                                                arguments_[agg_idx],
+                                                local_arguments_as_attributes,
+                                                predicate_.get(),
+                                                &reuse_matches));
     }
   }
 
@@ -378,8 +407,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
   mergeSingleState(local_state);
 }
 
-void AggregationOperationState::aggregateBlockHashTable(const block_id input_block) {
-  BlockReference block(storage_manager_->getBlock(input_block, input_relation_));
+void AggregationOperationState::aggregateBlockHashTable(
+    const block_id input_block) {
+  BlockReference block(
+      storage_manager_->getBlock(input_block, input_relation_));
 
   // If there is a filter predicate, 'reuse_matches' holds the set of matching
   // tuples so that it can be reused across multiple aggregates (i.e. we only
@@ -391,11 +422,10 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
   // GROUP BY expressions once).
   std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
 
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression
+      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
+      // expression
       // values and the aggregation arguments together as keys directly into the
       // (threadsafe) shared global distinctify HashTable for this aggregate.
       block->aggregateDistinct(*handles_[agg_idx],
@@ -406,45 +436,54 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                &reuse_group_by_vectors);
-    } else {
-      // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-      // directly into the (threadsafe) shared global HashTable for this
-      // aggregate.
-      DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
-      AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
-      DCHECK(agg_hash_table != nullptr);
-      block->aggregateGroupBy(*handles_[agg_idx],
-                              arguments_[agg_idx],
-                              group_by_list_,
-                              predicate_.get(),
-                              agg_hash_table,
-                              &reuse_matches,
-                              &reuse_group_by_vectors);
-      group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table);
     }
   }
+
+  // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+  // directly into the (threadsafe) shared global HashTable for this
+  // aggregate.
+  DCHECK(group_by_hashtable_pool_ != nullptr);
+  AggregationStateHashTableBase *agg_hash_table =
+      group_by_hashtable_pool_->getHashTableFast();
+  DCHECK(agg_hash_table != nullptr);
+  block->aggregateGroupBy(arguments_,
+                          group_by_list_,
+                          predicate_.get(),
+                          agg_hash_table,
+                          &reuse_matches,
+                          &reuse_group_by_vectors);
+  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
 }
 
-void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) {
+void AggregationOperationState::finalizeSingleState(
+    InsertDestination *output_destination) {
   // Simply build up a Tuple from the finalized values for each aggregate and
   // insert it in '*output_destination'.
   std::vector<TypedValue> attribute_values;
 
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
       single_states_[agg_idx].reset(
-          handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx]));
+          handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
+              *distinctify_hashtables_[agg_idx]));
     }
 
-    attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx]));
+    attribute_values.emplace_back(
+        handles_[agg_idx]->finalize(*single_states_[agg_idx]));
   }
 
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
-void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) {
+void AggregationOperationState::mergeGroupByHashTables(
+    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
+  HashTableMergerFast merger(dst);
+  (static_cast<FastHashTable<true, false, true, false> *>(src))
+      ->forEachCompositeKeyFast(&merger);
+}
+
+void AggregationOperationState::finalizeHashTable(
+    InsertDestination *output_destination) {
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
@@ -455,60 +494,57 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
 
   // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
   // e.g. Keep merging entries from smaller hash tables to larger.
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
-    if (hash_tables->size() > 1) {
-      for (int hash_table_index = 0;
-           hash_table_index < static_cast<int>(hash_tables->size() - 1);
-           ++hash_table_index) {
-        // Merge each hash table to the last hash table.
-        handles_[agg_idx]->mergeGroupByHashTables(
-            (*(*hash_tables)[hash_table_index]),
-            hash_tables->back().get());
-      }
+
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+  if (hash_tables->size() > 1) {
+    for (int hash_table_index = 0;
+         hash_table_index < static_cast<int>(hash_tables->size() - 1);
+         ++hash_table_index) {
+      // Merge each hash table to the last hash table.
+      mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
+                             hash_tables->back().get());
     }
   }
 
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
-  for (std::size_t agg_idx = 0;
-       agg_idx < handles_.size();
-       ++agg_idx) {
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     if (is_distinct_[agg_idx]) {
-      DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr);
-      auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+      DCHECK(group_by_hashtable_pool_ != nullptr);
+      auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
       DCHECK(hash_tables != nullptr);
       if (hash_tables->empty()) {
         // We may have a case where hash_tables is empty, e.g. no input blocks.
         // However for aggregateOnDistinctifyHashTableForGroupBy to work
         // correctly, we should create an empty group by hash table.
-        AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
-        group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
-        hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+        AggregationStateHashTableBase *new_hash_table =
+            group_by_hashtable_pool_->getHashTableFast();
+        group_by_hashtable_pool_->returnHashTable(new_hash_table);
+        hash_tables = group_by_hashtable_pool_->getAllHashTables();
       }
       DCHECK(hash_tables->back() != nullptr);
       AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
       DCHECK(agg_hash_table != nullptr);
+      handles_[agg_idx]->allowUpdate();
       handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
-          *distinctify_hashtables_[agg_idx],
-          agg_hash_table);
+          *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
     }
 
-    auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
     DCHECK(hash_tables != nullptr);
     if (hash_tables->empty()) {
       // We may have a case where hash_tables is empty, e.g. no input blocks.
       // However for aggregateOnDistinctifyHashTableForGroupBy to work
       // correctly, we should create an empty group by hash table.
-      AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable();
-      group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table);
-      hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables();
+      AggregationStateHashTableBase *new_hash_table =
+          group_by_hashtable_pool_->getHashTable();
+      group_by_hashtable_pool_->returnHashTable(new_hash_table);
+      hash_tables = group_by_hashtable_pool_->getAllHashTables();
     }
     AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
     DCHECK(agg_hash_table != nullptr);
-    ColumnVector* agg_result_col =
-        handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
-                                             &group_by_keys);
+    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+        *agg_hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
@@ -526,16 +562,20 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
   for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
     const Type &group_by_type = group_by_element->getType();
     if (NativeColumnVector::UsableForType(group_by_type)) {
-      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+      NativeColumnVector *element_cv =
+          new NativeColumnVector(group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     } else {
-      IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+      IndirectColumnVector *element_cv =
+          new IndirectColumnVector(group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index ecd116b..cbbfc22 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -102,16 +102,17 @@ class AggregationOperationState {
    *        tables. Single aggregation state (when GROUP BY list is not
    *        specified) is not allocated using memory from storage manager.
    */
-  AggregationOperationState(const CatalogRelationSchema &input_relation,
-                            const std::vector<const AggregateFunction*> &aggregate_functions,
-                            std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
-                            std::vector<bool> &&is_distinct,
-                            std::vector<std::unique_ptr<const Scalar>> &&group_by,
-                            const Predicate *predicate,
-                            const std::size_t estimated_num_entries,
-                            const HashTableImplType hash_table_impl_type,
-                            const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
-                            StorageManager *storage_manager);
+  AggregationOperationState(
+      const CatalogRelationSchema &input_relation,
+      const std::vector<const AggregateFunction *> &aggregate_functions,
+      std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+      std::vector<bool> &&is_distinct,
+      std::vector<std::unique_ptr<const Scalar>> &&group_by,
+      const Predicate *predicate,
+      const std::size_t estimated_num_entries,
+      const HashTableImplType hash_table_impl_type,
+      const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
+      StorageManager *storage_manager);
 
   ~AggregationOperationState() {}
 
@@ -143,8 +144,9 @@ class AggregationOperationState {
    *        in.
    * @return Whether proto is fully-formed and valid.
    **/
-  static bool ProtoIsValid(const serialization::AggregationOperationState &proto,
-                           const CatalogDatabaseLite &database);
+  static bool ProtoIsValid(
+      const serialization::AggregationOperationState &proto,
+      const CatalogDatabaseLite &database);
 
   /**
    * @brief Compute aggregates on the tuples of the given storage block,
@@ -165,10 +167,16 @@ class AggregationOperationState {
    **/
   void finalizeAggregate(InsertDestination *output_destination);
 
+  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+                                     AggregationStateHashTableBase *dst);
+
+  int dflag;
+
  private:
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
-  void mergeSingleState(const std::vector<std::unique_ptr<AggregationState>> &local_state);
+  void mergeSingleState(
+      const std::vector<std::unique_ptr<AggregationState>> &local_state);
 
   // Aggregate on input block.
   void aggregateBlockSingleState(const block_id input_block);
@@ -185,7 +193,8 @@ class AggregationOperationState {
 
   // Each individual aggregate in this operation has an AggregationHandle and
   // some number of Scalar arguments.
-  std::vector<std::unique_ptr<AggregationHandle>> handles_;
+  //  std::vector<std::unique_ptr<AggregationHandle>> handles_;
+  std::vector<AggregationHandle *> handles_;
   std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
 
   // For each aggregate, whether DISTINCT should be applied to the aggregate's
@@ -193,7 +202,8 @@ class AggregationOperationState {
   std::vector<bool> is_distinct_;
 
   // Hash table for obtaining distinct (i.e. unique) arguments.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+      distinctify_hashtables_;
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all an aggregate's argument expressions are simply attributes in
@@ -208,10 +218,11 @@ class AggregationOperationState {
   //
   // TODO(shoban): We should ideally store the aggregation state together in one
   // hash table to prevent multiple lookups.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+      group_by_hashtables_;
 
-  // A vector of group by hash table pools, one for each group by clause.
-  std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
+  // A vector of group by hash table pools.
+  std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
 
   StorageManager *storage_manager_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 65a7975..f05cc46 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -198,6 +198,9 @@ if (ENABLE_DISTRIBUTED)
 endif()
 
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
+add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
+add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
+add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -626,6 +629,53 @@ target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_threading_SpinMutex
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleReference
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_threading_SpinMutex
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_types_Type
+                      quickstep_types_TypedValue
+                      quickstep_utility_BloomFilter
+                      quickstep_utility_HashPair
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastHashTableFactory
+                      glog
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastSeparateChainingHashTable
+                      quickstep_storage_HashTable
+                      quickstep_storage_HashTable_proto
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
+                      quickstep_storage_LinearOpenAddressingHashTable
+                      quickstep_storage_SeparateChainingHashTable
+                      quickstep_storage_SimpleScalarSeparateChainingHashTable
+                      quickstep_storage_TupleReference
+                      quickstep_types_TypeFactory
+                      quickstep_utility_BloomFilter
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_HashTable
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableKeyManager
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_types_Type
+                      quickstep_types_TypedValue
+                      quickstep_utility_Alignment
+                      quickstep_utility_Macros
+                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_FileManager
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
@@ -711,6 +761,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
 target_link_libraries(quickstep_storage_HashTablePool
                       glog
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
                       quickstep_threading_SpinMutex
                       quickstep_utility_Macros
@@ -1098,6 +1150,9 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
                       quickstep_storage_FileManagerLocal
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
+                      quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase