You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/31 23:20:05 UTC

[11/13] incubator-quickstep git commit: Initial commit.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index b942c1b..5de2653 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -39,15 +39,17 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableBase.hpp"
 #include "storage/HashTableFactory.hpp"
+#include "storage/HashTableBase.hpp"
 #include "storage/InsertDestination.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
@@ -80,50 +82,63 @@ AggregationOperationState::AggregationOperationState(
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
-      is_aggregate_partitioned_(checkAggregatePartitioned(
-          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
+      is_aggregate_collision_free_(false),
+      is_aggregate_partitioned_(false),
       predicate_(predicate),
-      group_by_list_(std::move(group_by)),
-      arguments_(std::move(arguments)),
       is_distinct_(std::move(is_distinct)),
       storage_manager_(storage_manager) {
+  if (!group_by.empty()) {
+    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
+      is_aggregate_collision_free_ = true;
+    } else {
+      is_aggregate_partitioned_ = checkAggregatePartitioned(
+          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+    }
+  }
+
   // Sanity checks: each aggregate has a corresponding list of arguments.
-  DCHECK(aggregate_functions.size() == arguments_.size());
+  DCHECK(aggregate_functions.size() == arguments.size());
 
   // Get the types of GROUP BY expressions for creating HashTables below.
-  std::vector<const Type *> group_by_types;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    group_by_types.emplace_back(&group_by_element->getType());
+  for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+    group_by_types_.emplace_back(&group_by_element->getType());
   }
 
-  std::vector<AggregationHandle *> group_by_handles;
-  group_by_handles.clear();
+  // Prepare group-by element attribute ids and non-trivial expressions.
+  for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
+    const attribute_id attr_id =
+        group_by_element->getAttributeIdForValueAccessor();
+    if (attr_id == kInvalidAttributeID) {
+      const attribute_id non_trivial_attr_id =
+          -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+      non_trivial_expressions_.emplace_back(group_by_element.release());
+      group_by_key_ids_.emplace_back(non_trivial_attr_id);
+    } else {
+      group_by_key_ids_.emplace_back(attr_id);
+    }
+  }
 
   if (aggregate_functions.size() == 0) {
     // If there is no aggregation function, then it is a distinctify operation
     // on the group-by expressions.
-    DCHECK_GT(group_by_list_.size(), 0u);
+    DCHECK_GT(group_by_key_ids_.size(), 0u);
 
     handles_.emplace_back(new AggregationHandleDistinct());
-    arguments_.push_back({});
     is_distinct_.emplace_back(false);
     group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
                                                      hash_table_impl_type,
-                                                     group_by_types,
-                                                     {1},
-                                                     handles_,
+                                                     group_by_types_,
+                                                     {handles_.front().get()},
                                                      storage_manager));
   } else {
+    std::vector<AggregationHandle *> group_by_handles;
+
     // Set up each individual aggregate in this operation.
     std::vector<const AggregateFunction *>::const_iterator agg_func_it =
         aggregate_functions.begin();
-    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
-        args_it = arguments_.begin();
+    std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
+        args_it = arguments.begin();
     std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
-    std::vector<HashTableImplType>::const_iterator
-        distinctify_hash_table_impl_types_it =
-            distinctify_hash_table_impl_types.begin();
-    std::vector<std::size_t> payload_sizes;
     for (; agg_func_it != aggregate_functions.end();
          ++agg_func_it, ++args_it, ++is_distinct_it) {
       // Get the Types of this aggregate's arguments so that we can create an
@@ -133,6 +148,22 @@ AggregationOperationState::AggregationOperationState(
         argument_types.emplace_back(&argument->getType());
       }
 
+      // Prepare argument attribute ids and non-trivial expressions.
+      std::vector<attribute_id> argument_ids;
+      for (std::unique_ptr<const Scalar> &argument : *args_it) {
+        const attribute_id attr_id =
+            argument->getAttributeIdForValueAccessor();
+        if (attr_id == kInvalidAttributeID) {
+          const attribute_id non_trivial_attr_id =
+              -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
+          non_trivial_expressions_.emplace_back(argument.release());
+          argument_ids.emplace_back(non_trivial_attr_id);
+        } else {
+          argument_ids.emplace_back(attr_id);
+        }
+      }
+      argument_ids_.emplace_back(std::move(argument_ids));
+
       // Sanity checks: aggregate function exists and can apply to the specified
       // arguments.
       DCHECK(*agg_func_it != nullptr);
@@ -142,85 +173,43 @@ AggregationOperationState::AggregationOperationState(
       // to do actual aggregate computation.
       handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
 
-      if (!group_by_list_.empty()) {
+      if (!group_by_key_ids_.empty()) {
         // Aggregation with GROUP BY: combined payload is partially updated in
         // the presence of DISTINCT.
         if (*is_distinct_it) {
-          handles_.back()->blockUpdate();
+          LOG(FATAL) << "Distinct aggregation not supported";
         }
-        group_by_handles.emplace_back(handles_.back());
-        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
+        group_by_handles.emplace_back(handles_.back().get());
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-        // See if all of this aggregate's arguments are attributes in the input
-        // relation. If so, remember the attribute IDs so that we can do copy
-        // elision when actually performing the aggregation.
-        std::vector<attribute_id> local_arguments_as_attributes;
-        local_arguments_as_attributes.reserve(args_it->size());
-        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-          const attribute_id argument_id =
-              argument->getAttributeIdForValueAccessor();
-          if (argument_id == -1) {
-            local_arguments_as_attributes.clear();
-            break;
-          } else {
-            DCHECK_EQ(input_relation_.getID(),
-                      argument->getRelationIdForValueAccessor());
-            local_arguments_as_attributes.push_back(argument_id);
-          }
-        }
-
-        arguments_as_attributes_.emplace_back(
-            std::move(local_arguments_as_attributes));
-#endif
       }
+    }
 
-      // Initialize the corresponding distinctify hash table if this is a
-      // DISTINCT aggregation.
-      if (*is_distinct_it) {
-        std::vector<const Type *> key_types(group_by_types);
-        key_types.insert(
-            key_types.end(), argument_types.begin(), argument_types.end());
-        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
-        // estimating the number of entries in the distinctify hash table.
-        // We may estimate for each distinct aggregation an
-        // estimated_num_distinct_keys value during query optimization, if it's
-        // worth.
-        distinctify_hashtables_.emplace_back(
-            AggregationStateFastHashTableFactory::CreateResizable(
-                *distinctify_hash_table_impl_types_it,
-                key_types,
+    // Aggregation with GROUP BY: create a HashTable pool.
+    if (!group_by_key_ids_.empty()) {
+      if (is_aggregate_collision_free_) {
+        collision_free_hashtable_.reset(
+            AggregationStateHashTableFactory::CreateResizable(
+                hash_table_impl_type,
+                group_by_types_,
                 estimated_num_entries,
-                {0},
-                {},
+                group_by_handles,
                 storage_manager));
-        ++distinctify_hash_table_impl_types_it;
-      } else {
-        distinctify_hashtables_.emplace_back(nullptr);
-      }
-    }
-
-    if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool.
-      if (!is_aggregate_partitioned_) {
-        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                         hash_table_impl_type,
-                                                         group_by_types,
-                                                         payload_sizes,
-                                                         group_by_handles,
-                                                         storage_manager));
-      } else {
+      } else if (is_aggregate_partitioned_) {
         partitioned_group_by_hashtable_pool_.reset(
             new PartitionedHashTablePool(estimated_num_entries,
                                          FLAGS_num_aggregation_partitions,
                                          hash_table_impl_type,
-                                         group_by_types,
-                                         payload_sizes,
+                                         group_by_types_,
                                          group_by_handles,
                                          storage_manager));
+      } else {
+        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                         hash_table_impl_type,
+                                                         group_by_types_,
+                                                         group_by_handles,
+                                                         storage_manager));
       }
     }
   }
@@ -269,7 +258,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
         proto.group_by_expressions(group_by_idx), database));
   }
 
-  unique_ptr<Predicate> predicate;
+  std::unique_ptr<Predicate> predicate;
   if (proto.has_predicate()) {
     predicate.reset(
         PredicateFactory::ReconstructFromProto(proto.predicate(), database));
@@ -353,33 +342,72 @@ bool AggregationOperationState::ProtoIsValid(
   return true;
 }
 
-void AggregationOperationState::aggregateBlock(const block_id input_block,
-                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
-  if (group_by_list_.empty()) {
-    aggregateBlockSingleState(input_block);
+std::size_t AggregationOperationState::getNumPartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeAggregationStateHashTable *>(
+        collision_free_hashtable_.get())->getNumFinalizationPartitions();
+  } else if (is_aggregate_partitioned_) {
+    return partitioned_group_by_hashtable_pool_->getNumPartitions();
+  } else  {
+    return 1u;
+  }
+}
+
+std::size_t AggregationOperationState::getNumInitializationPartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeAggregationStateHashTable *>(
+        collision_free_hashtable_.get())->getNumInitializationPartitions();
   } else {
-    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+    return 0u;
   }
 }
 
-void AggregationOperationState::finalizeAggregate(
-    InsertDestination *output_destination) {
-  if (group_by_list_.empty()) {
-    finalizeSingleState(output_destination);
+void AggregationOperationState::initializeState(const std::size_t partition_id) {
+  if (is_aggregate_collision_free_) {
+    static_cast<CollisionFreeAggregationStateHashTable *>(
+        collision_free_hashtable_.get())->initialize(partition_id);
   } else {
-    finalizeHashTable(output_destination);
+    LOG(FATAL) << "AggregationOperationState::initializeState() "
+               << "is not supported by this aggregation";
   }
 }
 
-void AggregationOperationState::mergeSingleState(
-    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
-  DEBUG_ASSERT(local_state.size() == single_states_.size());
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (!is_distinct_[agg_idx]) {
-      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
-                                     single_states_[agg_idx].get());
+bool AggregationOperationState::checkAggregatePartitioned(
+    const std::size_t estimated_num_groups,
+    const std::vector<bool> &is_distinct,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const std::vector<const AggregateFunction *> &aggregate_functions) const {
+  // If there's no aggregation, return false.
+  if (aggregate_functions.empty()) {
+    return false;
+  }
+  // Check if there's a distinct operation involved in any aggregate, if so
+  // the aggregate can't be partitioned.
+  for (auto distinct : is_distinct) {
+    if (distinct) {
+      return false;
     }
   }
+  // There's no distinct aggregation involved, Check if there's at least one
+  // GROUP BY operation.
+  if (group_by.empty()) {
+    return false;
+  }
+  // There are GROUP BYs without DISTINCT. Check if the estimated number of
+  // groups is large enough to warrant a partitioned aggregation.
+  return estimated_num_groups >
+         static_cast<std::size_t>(
+             FLAGS_partition_aggregation_num_groups_threshold);
+  return false;
+}
+
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
+  if (group_by_key_ids_.empty()) {
+    aggregateBlockSingleState(input_block);
+  } else {
+    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+  }
 }
 
 void AggregationOperationState::aggregateBlockSingleState(
@@ -392,114 +420,137 @@ void AggregationOperationState::aggregateBlockSingleState(
 
   std::unique_ptr<TupleIdSequence> matches;
   if (predicate_ != nullptr) {
-    std::unique_ptr<ValueAccessor> accessor(
-        block->getTupleStorageSubBlock().createValueAccessor());
     matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
   }
 
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-    // If all arguments are attributes of the input relation, elide a copy.
-    if (!arguments_as_attributes_[agg_idx].empty()) {
-      local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
+  const auto &tuple_store = block->getTupleStorageSubBlock();
+  std::unique_ptr<ValueAccessor> accessor(
+      tuple_store.createValueAccessor(matches.get()));
+
+  ColumnVectorsValueAccessor non_trivial_results;
+  if (!non_trivial_expressions_.empty()) {
+    SubBlocksReference sub_blocks_ref(tuple_store,
+                                      block->getIndices(),
+                                      block->getIndicesConsistent());
+    for (const auto &expression : non_trivial_expressions_) {
+      non_trivial_results.addColumn(
+          expression->getAllValues(accessor.get(), &sub_blocks_ref));
     }
-#endif
-    if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to put the arguments as keys
-      // directly into the (threadsafe) shared global distinctify HashTable
-      // for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               local_arguments_as_attributes,
-                               {}, /* group_by */
-                               matches.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               nullptr /* reuse_group_by_vectors */);
-      local_state.emplace_back(nullptr);
+  }
+
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    const auto &argument_ids = argument_ids_[agg_idx];
+    const auto &handle = handles_[agg_idx];
+
+    AggregationState *state;
+    if (argument_ids.empty()) {
+      // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+      state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples()
+                                                           : matches->size());
     } else {
-      // Call StorageBlock::aggregate() to actually do the aggregation.
-      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
-                                                arguments_[agg_idx],
-                                                local_arguments_as_attributes,
-                                                matches.get()));
+      // Have the AggregationHandle actually do the aggregation.
+      state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids);
     }
+    local_state.emplace_back(state);
   }
 
   // Merge per-block aggregation states back with global state.
   mergeSingleState(local_state);
 }
 
+void AggregationOperationState::mergeSingleState(
+    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+  DEBUG_ASSERT(local_state.size() == single_states_.size());
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    if (!is_distinct_[agg_idx]) {
+      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+                                     single_states_[agg_idx].get());
+    }
+  }
+}
+
+void AggregationOperationState::mergeGroupByHashTables(
+    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
+  HashTableMergerFast merger(dst);
+  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
+      ->forEach(&merger);
+}
+
 void AggregationOperationState::aggregateBlockHashTable(
     const block_id input_block,
     LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
+  const auto &tuple_store = block->getTupleStorageSubBlock();
+  std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
+  std::unique_ptr<ValueAccessor> shared_accessor;
+  ValueAccessor *accessor = base_accessor.get();
 
   // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
   // as the existence map for the tuples.
   std::unique_ptr<TupleIdSequence> matches;
   if (predicate_ != nullptr) {
     matches.reset(block->getMatchesForPredicate(predicate_.get()));
+    shared_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+    accessor = shared_accessor.get();
   }
   if (lip_filter_adaptive_prober != nullptr) {
-    std::unique_ptr<ValueAccessor> accessor(
-        block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
-    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
+    shared_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+    accessor = shared_accessor.get();
   }
 
-  // This holds values of all the GROUP BY attributes so that the can be reused
-  // across multiple aggregates (i.e. we only pay the cost of evaluatin the
-  // GROUP BY expressions once).
-  std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
-
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
-      // expression
-      // values and the aggregation arguments together as keys directly into the
-      // (threadsafe) shared global distinctify HashTable for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               nullptr, /* arguments_as_attributes */
-                               group_by_list_,
-                               matches.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               &reuse_group_by_vectors);
+  std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
+  if (!non_trivial_expressions_.empty()) {
+    non_trivial_results.reset(new ColumnVectorsValueAccessor());
+    SubBlocksReference sub_blocks_ref(tuple_store,
+                                      block->getIndices(),
+                                      block->getIndicesConsistent());
+    for (const auto &expression : non_trivial_expressions_) {
+      non_trivial_results->addColumn(
+          expression->getAllValues(accessor, &sub_blocks_ref));
     }
   }
 
-  if (!is_aggregate_partitioned_) {
-    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-    // directly into the (threadsafe) shared global HashTable for this
-    // aggregate.
-    DCHECK(group_by_hashtable_pool_ != nullptr);
-    AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pool_->getHashTableFast();
-    DCHECK(agg_hash_table != nullptr);
-    block->aggregateGroupBy(arguments_,
-                            group_by_list_,
-                            matches.get(),
-                            agg_hash_table,
-                            &reuse_group_by_vectors);
-    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  accessor->beginIterationVirtual();
+
+  // TODO
+  if (is_aggregate_collision_free_) {
+    aggregateBlockHashTableImplCollisionFree(
+        accessor, non_trivial_results.get());
+  } else if (is_aggregate_partitioned_) {
+    aggregateBlockHashTableImplPartitioned(
+        accessor, non_trivial_results.get());
   } else {
-    ColumnVectorsValueAccessor temp_result;
-    // IDs of 'arguments' as attributes in the ValueAccessor we create below.
-    std::vector<attribute_id> argument_ids;
+    aggregateBlockHashTableImplThreadPrivate(
+        accessor, non_trivial_results.get());
+  }
+}
 
-    // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
-    std::vector<attribute_id> key_ids;
+void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK(collision_free_hashtable_ != nullptr);
+
+  collision_free_hashtable_->upsertValueAccessor(argument_ids_,
+                                                 group_by_key_ids_,
+                                                 accessor,
+                                                 aux_accessor);
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+
+  InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    // TODO(jianqiao): handle the situation when keys in non_trivial_results
     const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
-    block->aggregateGroupByPartitioned(
-        arguments_,
-        group_by_list_,
-        matches.get(),
-        num_partitions,
-        &temp_result,
-        &argument_ids,
-        &key_ids,
-        &reuse_group_by_vectors);
+
     // Compute the partitions for the tuple formed by group by values.
     std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
     partition_membership.resize(num_partitions);
@@ -507,32 +558,57 @@ void AggregationOperationState::aggregateBlockHashTable(
     // Create a tuple-id sequence for each partition.
     for (std::size_t partition = 0; partition < num_partitions; ++partition) {
       partition_membership[partition].reset(
-          new TupleIdSequence(temp_result.getEndPosition()));
+          new TupleIdSequence(accessor->getEndPosition()));
     }
 
     // Iterate over ValueAccessor for each tuple,
     // set a bit in the appropriate TupleIdSequence.
-    temp_result.beginIteration();
-    while (temp_result.next()) {
+    while (accessor->next()) {
       // We need a unique_ptr because getTupleWithAttributes() uses "new".
-      std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+      std::unique_ptr<Tuple> curr_tuple(
+          accessor->getTupleWithAttributes(group_by_key_ids_));
       const std::size_t curr_tuple_partition_id =
           curr_tuple->getTupleHash() % num_partitions;
       partition_membership[curr_tuple_partition_id]->set(
-          temp_result.getCurrentPosition(), true);
+          accessor->getCurrentPosition(), true);
     }
-    // For each partition, create an adapter around Value Accessor and
-    // TupleIdSequence.
-    std::vector<std::unique_ptr<
-        TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
-    adapter.resize(num_partitions);
+    // Aggregate each partition.
     for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-      adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
-          *(partition_membership)[partition]));
+      std::unique_ptr<ValueAccessor> adapter(
+          accessor->createSharedTupleIdSequenceAdapter(
+              *(partition_membership)[partition]));
       partitioned_group_by_hashtable_pool_->getHashTable(partition)
-          ->upsertValueAccessorCompositeKeyFast(
-              argument_ids, adapter[partition].get(), key_ids, true);
+          ->upsertValueAccessor(argument_ids_,
+                                group_by_key_ids_,
+                                adapter.get(),
+                                aux_accessor);
     }
+  });
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK(group_by_hashtable_pool_ != nullptr);
+
+  AggregationStateHashTableBase *agg_hash_table =
+      group_by_hashtable_pool_->getHashTable();
+
+  agg_hash_table->upsertValueAccessor(argument_ids_,
+                                      group_by_key_ids_,
+                                      accessor,
+                                      aux_accessor);
+  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+}
+
+void AggregationOperationState::finalizeAggregate(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  if (group_by_key_ids_.empty()) {
+    DCHECK_EQ(0u, partition_id);
+    finalizeSingleState(output_destination);
+  } else {
+    finalizeHashTable(partition_id, output_destination);
   }
 }
 
@@ -543,12 +619,6 @@ void AggregationOperationState::finalizeSingleState(
   std::vector<TypedValue> attribute_values;
 
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (is_distinct_[agg_idx]) {
-      single_states_[agg_idx].reset(
-          handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
-              *distinctify_hashtables_[agg_idx]));
-    }
-
     attribute_values.emplace_back(
         handles_[agg_idx]->finalize(*single_states_[agg_idx]));
   }
@@ -556,80 +626,79 @@ void AggregationOperationState::finalizeSingleState(
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
-void AggregationOperationState::mergeGroupByHashTables(
-    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
-  HashTableMergerFast merger(dst);
-  (static_cast<FastHashTable<true, false, true, false> *>(src))
-      ->forEachCompositeKeyFast(&merger);
+void AggregationOperationState::finalizeHashTable(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  if (is_aggregate_collision_free_) {
+    finalizeHashTableImplCollisionFree(partition_id, output_destination);
+  } else if (is_aggregate_partitioned_) {
+    finalizeHashTableImplPartitioned(partition_id, output_destination);
+  } else {
+    DCHECK_EQ(0u, partition_id);
+    finalizeHashTableImplThreadPrivate(output_destination);
+  }
 }
 
-void AggregationOperationState::finalizeHashTable(
+void AggregationOperationState::finalizeHashTableImplCollisionFree(
+    const std::size_t partition_id,
     InsertDestination *output_destination) {
-  // Each element of 'group_by_keys' is a vector of values for a particular
-  // group (which is also the prefix of the finalized Tuple for that group).
-  std::vector<std::vector<TypedValue>> group_by_keys;
+  std::vector<std::unique_ptr<ColumnVector>> final_values;
+  CollisionFreeAggregationStateHashTable *hash_table =
+      static_cast<CollisionFreeAggregationStateHashTable *>(
+          collision_free_hashtable_.get());
 
-  // TODO(harshad) - The merge phase may be slower when each hash table contains
-  // large number of entries. We should find ways in which we can perform a
-  // parallel merge.
+  // TODO
+  const std::size_t max_length =
+      hash_table->getNumTuplesInPartition(partition_id);
+  ColumnVectorsValueAccessor complete_result;
 
-  // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
-  // e.g. Keep merging entries from smaller hash tables to larger.
+  DCHECK_EQ(1u, group_by_types_.size());
+  const Type *key_type = group_by_types_.front();
+  DCHECK(NativeColumnVector::UsableForType(*key_type));
 
-  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  if (hash_tables->size() > 1) {
-    for (int hash_table_index = 0;
-         hash_table_index < static_cast<int>(hash_tables->size() - 1);
-         ++hash_table_index) {
-      // Merge each hash table to the last hash table.
-      mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
-                             hash_tables->back().get());
+  std::unique_ptr<NativeColumnVector> key_cv(
+      new NativeColumnVector(*key_type, max_length));
+  hash_table->finalizeKey(partition_id, key_cv.get());
+  complete_result.addColumn(key_cv.release());
+
+  for (std::size_t i = 0; i < handles_.size(); ++i) {
+    if (handles_[i]->getAggregationID() == AggregationID::kDistinct) {
+      DCHECK_EQ(1u, handles_.size());
+      break;
     }
+
+    const Type *result_type = handles_[i]->getResultType();
+    DCHECK(NativeColumnVector::UsableForType(*result_type));
+
+    std::unique_ptr<NativeColumnVector> result_cv(
+        new NativeColumnVector(*result_type, max_length));
+    hash_table->finalizeState(partition_id, i, result_cv.get());
+    complete_result.addColumn(result_cv.release());
   }
 
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
+void AggregationOperationState::finalizeHashTableImplPartitioned(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  // Each element of 'group_by_keys' is a vector of values for a particular
+  // group (which is also the prefix of the finalized Tuple for that group).
+  std::vector<std::vector<TypedValue>> group_by_keys;
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
+  AggregationStateHashTableBase *hash_table =
+      partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (is_distinct_[agg_idx]) {
-      DCHECK(group_by_hashtable_pool_ != nullptr);
-      auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      DCHECK(hash_tables != nullptr);
-      if (hash_tables->empty()) {
-        // We may have a case where hash_tables is empty, e.g. no input blocks.
-        // However for aggregateOnDistinctifyHashTableForGroupBy to work
-        // correctly, we should create an empty group by hash table.
-        AggregationStateHashTableBase *new_hash_table =
-            group_by_hashtable_pool_->getHashTableFast();
-        group_by_hashtable_pool_->returnHashTable(new_hash_table);
-        hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      }
-      DCHECK(hash_tables->back() != nullptr);
-      AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-      DCHECK(agg_hash_table != nullptr);
-      handles_[agg_idx]->allowUpdate();
-      handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
-          *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
-    }
-
-    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    DCHECK(hash_tables != nullptr);
-    if (hash_tables->empty()) {
-      // We may have a case where hash_tables is empty, e.g. no input blocks.
-      // However for aggregateOnDistinctifyHashTableForGroupBy to work
-      // correctly, we should create an empty group by hash table.
-      AggregationStateHashTableBase *new_hash_table =
-          group_by_hashtable_pool_->getHashTableFast();
-      group_by_hashtable_pool_->returnHashTable(new_hash_table);
-      hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    }
-    AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-    DCHECK(agg_hash_table != nullptr);
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *agg_hash_table, &group_by_keys, agg_idx);
+        *hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
   }
+  hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -640,23 +709,20 @@ void AggregationOperationState::finalizeHashTable(
   // in a single HashTable.
   std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
   std::size_t group_by_element_idx = 0;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    const Type &group_by_type = group_by_element->getType();
-    if (NativeColumnVector::UsableForType(group_by_type)) {
+  for (const Type *group_by_type : group_by_types_) {
+    if (NativeColumnVector::UsableForType(*group_by_type)) {
       NativeColumnVector *element_cv =
-          new NativeColumnVector(group_by_type, group_by_keys.size());
+          new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
       }
     } else {
       IndirectColumnVector *element_cv =
-          new IndirectColumnVector(group_by_type, group_by_keys.size());
+          new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;
@@ -676,42 +742,44 @@ void AggregationOperationState::finalizeHashTable(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::destroyAggregationHashTablePayload() {
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
-      nullptr;
-  if (!is_aggregate_partitioned_) {
-    if (group_by_hashtable_pool_ != nullptr) {
-      all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    }
-  } else {
-    if (partitioned_group_by_hashtable_pool_ != nullptr) {
-      all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
-    }
-  }
-  if (all_hash_tables != nullptr) {
-    for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
-      (*all_hash_tables)[ht_index]->destroyPayload();
-    }
-  }
-}
-
-void AggregationOperationState::finalizeAggregatePartitioned(
-    const std::size_t partition_id, InsertDestination *output_destination) {
+void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+    InsertDestination *output_destination) {
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
 
+  // TODO(harshad) - The merge phase may be slower when each hash table contains
+  // large number of entries. We should find ways in which we can perform a
+  // parallel merge.
+
+  // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+  // e.g. Keep merging entries from smaller hash tables to larger.
+
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+  DCHECK(hash_tables != nullptr);
+  if (hash_tables->empty()) {
+    return;
+  }
+
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+      hash_tables->back().release());
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+    hash_table->destroyPayload();
+  }
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    AggregationStateHashTableBase *hash_table =
-        partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *hash_table, &group_by_keys, agg_idx);
+        *final_hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
   }
+  final_hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -722,19 +790,22 @@ void AggregationOperationState::finalizeAggregatePartitioned(
   // in a single HashTable.
   std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
   std::size_t group_by_element_idx = 0;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    const Type &group_by_type = group_by_element->getType();
-    if (NativeColumnVector::UsableForType(group_by_type)) {
-      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+  for (const Type *group_by_type : group_by_types_) {
+    if (NativeColumnVector::UsableForType(*group_by_type)) {
+      NativeColumnVector *element_cv =
+          new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     } else {
-      IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+      IndirectColumnVector *element_cv =
+          new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 591e3a1..44803fc 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,7 +33,9 @@
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
 #include "storage/PartitionedHashTablePool.hpp"
+#include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/ConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 
 #include "gflags/gflags.h"
@@ -43,9 +45,11 @@ namespace quickstep {
 class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
+class ColumnVectorsValueAccessor;
 class InsertDestination;
 class LIPFilterAdaptiveProber;
 class StorageManager;
+class TupleIdSequence;
 
 DECLARE_int32(num_aggregation_partitions);
 DECLARE_int32(partition_aggregation_num_groups_threshold);
@@ -166,127 +170,99 @@ class AggregationOperationState {
    *        the block.
    **/
   void aggregateBlock(const block_id input_block,
-                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr);
 
   /**
    * @brief Generate the final results for the aggregates managed by this
    *        AggregationOperationState and write them out to StorageBlock(s).
    *
+   * @param partition_id The partition id of this finalize operation.
    * @param output_destination An InsertDestination where the finalized output
    *        tuple(s) from this aggregate are to be written.
    **/
-  void finalizeAggregate(InsertDestination *output_destination);
-
-  /**
-   * @brief Destroy the payloads in the aggregation hash tables.
-   **/
-  void destroyAggregationHashTablePayload();
-
-  /**
-   * @brief Generate the final results for the aggregates managed by this
-   *        AggregationOperationState and write them out to StorageBlock(s).
-   *        In this implementation, each thread picks a hash table belonging to
-   *        a partition and writes its values to StorageBlock(s). There is no
-   *        need to merge multiple hash tables in one, because there is no
-   *        overlap in the keys across two hash tables.
-   *
-   * @param partition_id The ID of the partition for which finalize is being
-   *        performed.
-   * @param output_destination An InsertDestination where the finalized output
-   *        tuple(s) from this aggregate are to be written.
-   **/
-  void finalizeAggregatePartitioned(
-      const std::size_t partition_id, InsertDestination *output_destination);
-
-  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
-                                     AggregationStateHashTableBase *dst);
-
-  bool isAggregatePartitioned() const {
-    return is_aggregate_partitioned_;
-  }
+  void finalizeAggregate(const std::size_t partition_id,
+                         InsertDestination *output_destination);
 
   /**
    * @brief Get the number of partitions to be used for the aggregation.
    *        For non-partitioned aggregations, we return 1.
    **/
-  std::size_t getNumPartitions() const {
-    return is_aggregate_partitioned_
-               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
-               : 1;
-  }
+  std::size_t getNumPartitions() const;
 
-  int dflag;
+  std::size_t getNumInitializationPartitions() const;
+
+  void initializeState(const std::size_t partition_id);
 
  private:
-  // Merge locally (per storage block) aggregated states with global aggregation
-  // states.
-  void mergeSingleState(
-      const std::vector<std::unique_ptr<AggregationState>> &local_state);
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<const AggregateFunction *> &aggregate_functions) const;
 
   // Aggregate on input block.
   void aggregateBlockSingleState(const block_id input_block);
   void aggregateBlockHashTable(const block_id input_block,
                                LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
 
-  void finalizeSingleState(InsertDestination *output_destination);
-  void finalizeHashTable(InsertDestination *output_destination);
+  // Merge locally (per storage block) aggregated states with global aggregation
+  // states.
+  void mergeSingleState(
+      const std::vector<std::unique_ptr<AggregationState>> &local_state);
+  void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+                              AggregationStateHashTableBase *dst) const;
 
-  bool checkAggregatePartitioned(
-      const std::size_t estimated_num_groups,
-      const std::vector<bool> &is_distinct,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const std::vector<const AggregateFunction *> &aggregate_functions) const {
-    // If there's no aggregation, return false.
-    if (aggregate_functions.empty()) {
-      return false;
-    }
-    // Check if there's a distinct operation involved in any aggregate, if so
-    // the aggregate can't be partitioned.
-    for (auto distinct : is_distinct) {
-      if (distinct) {
-        return false;
-      }
-    }
-    // There's no distinct aggregation involved, Check if there's at least one
-    // GROUP BY operation.
-    if (group_by.empty()) {
-      return false;
-    }
-    // There are GROUP BYs without DISTINCT. Check if the estimated number of
-    // groups is large enough to warrant a partitioned aggregation.
-    return estimated_num_groups >
-           static_cast<std::size_t>(
-               FLAGS_partition_aggregation_num_groups_threshold);
-  }
+  // Finalize the aggregation results into output_destination.
+  void finalizeSingleState(InsertDestination *output_destination);
+  void finalizeHashTable(const std::size_t partition_id,
+                         InsertDestination *output_destination);
+
+  // Specialized implementations for aggregateBlockHashTable.
+  void aggregateBlockHashTableImplCollisionFree(ValueAccessor *accessor,
+                                                ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockHashTableImplPartitioned(ValueAccessor *accessor,
+                                              ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockHashTableImplThreadPrivate(ValueAccessor *accessor,
+                                                ColumnVectorsValueAccessor *aux_accessor);
+
+  // Specialized implementations for finalizeHashTable.
+  void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
+                                          InsertDestination *output_destination);
+  void finalizeHashTableImplPartitioned(const std::size_t partition_id,
+                                        InsertDestination *output_destination);
+  void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
 
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
 
+  // Whether the aggregation is collision free or not.
+  bool is_aggregate_collision_free_;
+
   // Whether the aggregation is partitioned or not.
-  const bool is_aggregate_partitioned_;
+  bool is_aggregate_partitioned_;
 
   std::unique_ptr<const Predicate> predicate_;
-  std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
   // Each individual aggregate in this operation has an AggregationHandle and
-  // some number of Scalar arguments.
-  std::vector<AggregationHandle *> handles_;
-  std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
+  // zero (indicated by -1) or one argument.
+  std::vector<std::unique_ptr<AggregationHandle>> handles_;
 
   // For each aggregate, whether DISTINCT should be applied to the aggregate's
   // arguments.
   std::vector<bool> is_distinct_;
 
-  // Hash table for obtaining distinct (i.e. unique) arguments.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-      distinctify_hashtables_;
+  // Non-trivial group-by/argument expressions that need to be evaluated.
+  std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // If all an aggregate's argument expressions are simply attributes in
-  // 'input_relation_', then this caches the attribute IDs of those arguments.
-  std::vector<std::vector<attribute_id>> arguments_as_attributes_;
-#endif
+  std::vector<attribute_id> group_by_key_ids_;
+  std::vector<std::vector<attribute_id>> argument_ids_;
+
+  std::vector<const Type *> group_by_types_;
+
+  // Hash table for obtaining distinct (i.e. unique) arguments.
+//  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+//      distinctify_hashtables_;
 
   // Per-aggregate global states for aggregation without GROUP BY.
   std::vector<std::unique_ptr<AggregationState>> single_states_;
@@ -303,6 +279,8 @@ class AggregationOperationState {
 
   std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
 
+  std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index fddea1f..c7bc28f 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,6 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
               bitweaving/BitWeavingVIndexSubBlock.hpp)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+add_library(quickstep_storage_CollisionFreeAggregationStateHashTable
+            CollisionFreeAggregationStateHashTable.cpp
+            CollisionFreeAggregationStateHashTable.hpp)
 add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
 add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
 add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -194,9 +197,6 @@ if (ENABLE_DISTRIBUTED)
 endif()
 
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
-add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
-add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
-add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -225,6 +225,9 @@ add_library(quickstep_storage_InsertDestination_proto
 add_library(quickstep_storage_LinearOpenAddressingHashTable
             ../empty_src.cpp
             LinearOpenAddressingHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadAggregationStateHashTable
+            PackedPayloadAggregationStateHashTable.cpp
+            PackedPayloadAggregationStateHashTable.hpp)
 add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
@@ -276,22 +279,25 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_storage_AggregationOperationState_proto
-                      quickstep_storage_HashTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
                       quickstep_storage_PartitionedHashTablePool
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_SubBlocksReference
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros
+                      quickstep_utility_ConcurrentBitVector
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber)
 target_link_libraries(quickstep_storage_AggregationOperationState_proto
                       quickstep_expressions_Expressions_proto
@@ -429,6 +435,24 @@ if(QUICKSTEP_HAVE_BITWEAVING)
                         quickstep_utility_Macros)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_ConcurrentBitVector
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ColumnStoreUtil
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
@@ -626,52 +650,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_threading_SpinMutex
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTable
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_storage_TupleReference
-                      quickstep_storage_ValueAccessor
-                      quickstep_storage_ValueAccessorUtil
-                      quickstep_threading_SpinMutex
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_HashPair
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTableFactory
-                      glog
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTable_proto
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
-                      quickstep_storage_LinearOpenAddressingHashTable
-                      quickstep_storage_SeparateChainingHashTable
-                      quickstep_storage_SimpleScalarSeparateChainingHashTable
-                      quickstep_storage_TupleReference
-                      quickstep_types_TypeFactory
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableKeyManager
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_Alignment
-                      quickstep_utility_Macros
-                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_FileManager
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
@@ -734,10 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_storage_HashTableFactory
                       glog
+                      quickstep_storage_CollisionFreeAggregationStateHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
                       quickstep_storage_LinearOpenAddressingHashTable
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
@@ -757,9 +737,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
 target_link_libraries(quickstep_storage_HashTablePool
                       glog
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
                       quickstep_threading_SpinMutex
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
@@ -817,12 +796,32 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_utility_Alignment
                       quickstep_utility_Macros
                       quickstep_utility_PrimeNumber)
+target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableKeyManager
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleReference
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_threading_SpinMutex
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_types_Type
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_Alignment
+                      quickstep_utility_HashPair
+                      quickstep_utility_Macros
+                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_PartitionedHashTablePool
                       glog
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 target_link_libraries(quickstep_storage_PreloaderThread
@@ -933,7 +932,6 @@ target_link_libraries(quickstep_storage_StorageBlock
                       glog
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_storage_BasicColumnStoreTupleStorageSubBlock
@@ -942,7 +940,6 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
                       quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock
                       quickstep_storage_CountedReference
-                      quickstep_storage_HashTableBase
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_SMAIndexSubBlock
@@ -1111,6 +1108,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
+                      quickstep_storage_CollisionFreeAggregationStateHashTable
                       quickstep_storage_ColumnStoreUtil
                       quickstep_storage_CompressedBlockBuilder
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1123,9 +1121,6 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
                       quickstep_storage_FileManagerLocal
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
-                      quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
@@ -1139,6 +1134,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PartitionedHashTablePool
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b46bc73c/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp
new file mode 100644
index 0000000..15d4dfe
--- /dev/null
+++ b/storage/CollisionFreeAggregationStateHashTable.cpp
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeAggregationStateHashTable.hpp"
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+namespace quickstep {
+
+CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
+    const std::vector<const Type *> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_type_(key_types.front()),
+      num_entries_(num_entries),
+      num_handles_(handles.size()),
+      handles_(handles),
+      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+      storage_manager_(storage_manager) {
+  CHECK_EQ(1u, key_types.size());
+  DCHECK_GT(num_entries, 0u);
+
+  std::map<std::string, std::size_t> memory_offsets;
+  std::size_t required_memory = 0;
+
+  memory_offsets.emplace("existence_map", required_memory);
+  required_memory +=
+      CacheLineAlignedBytes(ConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::atomic<std::size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        CHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(std::atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "Not implemented";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Not implemented";
+    }
+
+    memory_offsets.emplace(std::string("state") + std::to_string(i),
+                           required_memory);
+    required_memory += CacheLineAlignedBytes(state_size * num_entries);
+  }
+
+  const std::size_t num_storage_slots =
+      storage_manager_->SlotsNeededForBytes(required_memory);
+
+  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+  blob_ = storage_manager_->getBlobMutable(blob_id);
+
+  void *memory_start = blob_->getMemoryMutable();
+  existence_map_.reset(new ConcurrentBitVector(
+      reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
+      num_entries,
+      false /* initialize */));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    vec_tables_.emplace_back(
+        reinterpret_cast<char *>(memory_start) +
+            memory_offsets.at(std::string("state") + std::to_string(i)));
+  }
+
+  memory_size_ = required_memory;
+  num_init_partitions_ = std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL);
+}
+
+CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() {
+  const block_id blob_id = blob_->getID();
+  blob_.release();
+  storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeAggregationStateHashTable::destroyPayload() {
+}
+
+bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
+    const std::vector<std::vector<attribute_id>> &argument_ids,
+    const std::vector<attribute_id> &key_attr_ids,
+    ValueAccessor *base_accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK_EQ(1u, key_attr_ids.size());
+
+  const attribute_id key_attr_id = key_attr_ids.front();
+  const bool is_key_nullable = key_type_->isNullable();
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    DCHECK_LE(argument_ids[i].size(), 1u);
+
+    const attribute_id argument_id =
+        argument_ids[i].empty() ? kInvalidAttributeID : argument_ids[i].front();
+
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+
+    const Type *argument_type;
+    bool is_argument_nullable;
+    if (argument_types.empty()) {
+      argument_type = nullptr;
+      is_argument_nullable = false;
+    } else {
+      argument_type = argument_types.front();
+      is_argument_nullable = argument_type->isNullable();
+    }
+
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        base_accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (key_attr_id >= 0) {
+        if (argument_id >= 0) {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   key_attr_id,
+                                                   argument_id,
+                                                   vec_tables_[i],
+                                                   accessor,
+                                                   accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  key_attr_id,
+                                                  -(argument_id+2),
+                                                  vec_tables_[i],
+                                                  accessor,
+                                                  aux_accessor);
+        }
+      } else {
+        if (argument_id >= 0) {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  -(key_attr_id+2),
+                                                  argument_id,
+                                                  vec_tables_[i],
+                                                  aux_accessor,
+                                                  accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   -(key_attr_id+2),
+                                                   -(argument_id+2),
+                                                   vec_tables_[i],
+                                                   aux_accessor,
+                                                   aux_accessor);
+        }
+      }
+    });
+  }
+  return true;
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeKey(
+    const std::size_t partition_id,
+    NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  switch (key_type_->getTypeID()) {
+    case TypeID::kInt:
+      finalizeKeyInternal<int>(start_position, end_position, output_cv);
+      return;
+    case TypeID::kLong:
+      finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeState(
+    const std::size_t partition_id,
+    std::size_t handle_id,
+    NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  const AggregationHandle *handle = handles_[handle_id];
+  const auto &argument_types = handle->getArgumentTypes();
+  const Type *argument_type =
+      argument_types.empty() ? nullptr : argument_types.front();
+
+  finalizeStateDispatchHelper(handle->getAggregationID(),
+                              argument_type,
+                              vec_tables_[handle_id],
+                              start_position,
+                              end_position,
+                              output_cv);
+}
+
+}  // namespace quickstep