You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/07 22:35:28 UTC

[09/14] incubator-quickstep git commit: - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index b942c1b..0b34908 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -20,7 +20,7 @@
 #include "storage/AggregationOperationState.hpp"
 
 #include <cstddef>
-#include <cstdio>
+#include <cstdint>
 #include <memory>
 #include <string>
 #include <utility>
@@ -34,29 +34,32 @@
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunctionFactory.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableBase.hpp"
+#include "storage/CollisionFreeVectorTable.hpp"
 #include "storage/HashTableFactory.hpp"
+#include "storage/HashTableBase.hpp"
 #include "storage/InsertDestination.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
 #include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
-#include "glog/logging.h"
+#include "gflags/gflags.h"
 
-using std::unique_ptr;
+#include "glog/logging.h"
 
 namespace quickstep {
 
@@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState(
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
-      is_aggregate_partitioned_(checkAggregatePartitioned(
-          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
+      is_aggregate_collision_free_(false),
+      is_aggregate_partitioned_(false),
       predicate_(predicate),
-      group_by_list_(std::move(group_by)),
-      arguments_(std::move(arguments)),
       is_distinct_(std::move(is_distinct)),
       storage_manager_(storage_manager) {
+  if (!group_by.empty()) {
+    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
+      is_aggregate_collision_free_ = true;
+    } else {
+      is_aggregate_partitioned_ = checkAggregatePartitioned(
+          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+    }
+  }
+
   // Sanity checks: each aggregate has a corresponding list of arguments.
-  DCHECK(aggregate_functions.size() == arguments_.size());
+  DCHECK(aggregate_functions.size() == arguments.size());
 
   // Get the types of GROUP BY expressions for creating HashTables below.
-  std::vector<const Type *> group_by_types;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    group_by_types.emplace_back(&group_by_element->getType());
+  for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+    group_by_types_.emplace_back(&group_by_element->getType());
+  }
+
+  // Prepare group-by key ids and non-trivial expressions.
+  for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
+    const attribute_id attr_id =
+        group_by_element->getAttributeIdForValueAccessor();
+    if (attr_id != kInvalidAttributeID) {
+      group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
+    } else {
+      group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
+                                     non_trivial_expressions_.size());
+      non_trivial_expressions_.emplace_back(group_by_element.release());
+    }
   }
 
   std::vector<AggregationHandle *> group_by_handles;
-  group_by_handles.clear();
-
-  if (aggregate_functions.size() == 0) {
-    // If there is no aggregation function, then it is a distinctify operation
-    // on the group-by expressions.
-    DCHECK_GT(group_by_list_.size(), 0u);
-
-    handles_.emplace_back(new AggregationHandleDistinct());
-    arguments_.push_back({});
-    is_distinct_.emplace_back(false);
-    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                     hash_table_impl_type,
-                                                     group_by_types,
-                                                     {1},
-                                                     handles_,
-                                                     storage_manager));
-  } else {
-    // Set up each individual aggregate in this operation.
-    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
-        aggregate_functions.begin();
-    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
-        args_it = arguments_.begin();
-    std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
-    std::vector<HashTableImplType>::const_iterator
-        distinctify_hash_table_impl_types_it =
-            distinctify_hash_table_impl_types.begin();
-    std::vector<std::size_t> payload_sizes;
-    for (; agg_func_it != aggregate_functions.end();
-         ++agg_func_it, ++args_it, ++is_distinct_it) {
-      // Get the Types of this aggregate's arguments so that we can create an
-      // AggregationHandle.
-      std::vector<const Type *> argument_types;
-      for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-        argument_types.emplace_back(&argument->getType());
-      }
 
-      // Sanity checks: aggregate function exists and can apply to the specified
-      // arguments.
-      DCHECK(*agg_func_it != nullptr);
-      DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
-
-      // Have the AggregateFunction create an AggregationHandle that we can use
-      // to do actual aggregate computation.
-      handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
-
-      if (!group_by_list_.empty()) {
-        // Aggregation with GROUP BY: combined payload is partially updated in
-        // the presence of DISTINCT.
-        if (*is_distinct_it) {
-          handles_.back()->blockUpdate();
-        }
-        group_by_handles.emplace_back(handles_.back());
-        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
+  // Set up each individual aggregate in this operation.
+  std::vector<const AggregateFunction *>::const_iterator agg_func_it =
+      aggregate_functions.begin();
+  std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
+      args_it = arguments.begin();
+  std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
+  std::vector<HashTableImplType>::const_iterator
+      distinctify_hash_table_impl_types_it =
+          distinctify_hash_table_impl_types.begin();
+  for (; agg_func_it != aggregate_functions.end();
+       ++agg_func_it, ++args_it, ++is_distinct_it) {
+    // Get the Types of this aggregate's arguments so that we can create an
+    // AggregationHandle.
+    std::vector<const Type *> argument_types;
+    for (const std::unique_ptr<const Scalar> &argument : *args_it) {
+      argument_types.emplace_back(&argument->getType());
+    }
+
+    // Prepare argument attribute ids and non-trivial expressions.
+    std::vector<MultiSourceAttributeId> argument_ids;
+    for (std::unique_ptr<const Scalar> &argument : *args_it) {
+      const attribute_id attr_id =
+          argument->getAttributeIdForValueAccessor();
+      if (attr_id != kInvalidAttributeID) {
+        argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
       } else {
-        // Aggregation without GROUP BY: create a single global state.
-        single_states_.emplace_back(handles_.back()->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-        // See if all of this aggregate's arguments are attributes in the input
-        // relation. If so, remember the attribute IDs so that we can do copy
-        // elision when actually performing the aggregation.
-        std::vector<attribute_id> local_arguments_as_attributes;
-        local_arguments_as_attributes.reserve(args_it->size());
-        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
-          const attribute_id argument_id =
-              argument->getAttributeIdForValueAccessor();
-          if (argument_id == -1) {
-            local_arguments_as_attributes.clear();
-            break;
-          } else {
-            DCHECK_EQ(input_relation_.getID(),
-                      argument->getRelationIdForValueAccessor());
-            local_arguments_as_attributes.push_back(argument_id);
-          }
-        }
-
-        arguments_as_attributes_.emplace_back(
-            std::move(local_arguments_as_attributes));
-#endif
+        argument_ids.emplace_back(ValueAccessorSource::kDerived,
+                                  non_trivial_expressions_.size());
+        non_trivial_expressions_.emplace_back(argument.release());
       }
+    }
+    argument_ids_.emplace_back(std::move(argument_ids));
+
+    // Sanity checks: aggregate function exists and can apply to the specified
+    // arguments.
+    DCHECK(*agg_func_it != nullptr);
+    DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
 
-      // Initialize the corresponding distinctify hash table if this is a
-      // DISTINCT aggregation.
+    // Have the AggregateFunction create an AggregationHandle that we can use
+    // to do actual aggregate computation.
+    handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
+
+    if (!group_by_key_ids_.empty()) {
+      // Aggregation with GROUP BY: combined payload is partially updated in
+      // the presence of DISTINCT.
       if (*is_distinct_it) {
-        std::vector<const Type *> key_types(group_by_types);
-        key_types.insert(
-            key_types.end(), argument_types.begin(), argument_types.end());
-        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
-        // estimating the number of entries in the distinctify hash table.
-        // We may estimate for each distinct aggregation an
-        // estimated_num_distinct_keys value during query optimization, if it's
-        // worth.
-        distinctify_hashtables_.emplace_back(
-            AggregationStateFastHashTableFactory::CreateResizable(
-                *distinctify_hash_table_impl_types_it,
-                key_types,
-                estimated_num_entries,
-                {0},
-                {},
-                storage_manager));
-        ++distinctify_hash_table_impl_types_it;
-      } else {
-        distinctify_hashtables_.emplace_back(nullptr);
+        handles_.back()->blockUpdate();
       }
+      group_by_handles.emplace_back(handles_.back().get());
+    } else {
+      // Aggregation without GROUP BY: create a single global state.
+      single_states_.emplace_back(handles_.back()->createInitialState());
     }
 
-    if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool.
-      if (!is_aggregate_partitioned_) {
-        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                         hash_table_impl_type,
-                                                         group_by_types,
-                                                         payload_sizes,
-                                                         group_by_handles,
-                                                         storage_manager));
-      } else {
-        partitioned_group_by_hashtable_pool_.reset(
-            new PartitionedHashTablePool(estimated_num_entries,
-                                         FLAGS_num_aggregation_partitions,
-                                         hash_table_impl_type,
-                                         group_by_types,
-                                         payload_sizes,
-                                         group_by_handles,
-                                         storage_manager));
-      }
+    // Initialize the corresponding distinctify hash table if this is a
+    // DISTINCT aggregation.
+    if (*is_distinct_it) {
+      std::vector<const Type *> key_types(group_by_types_);
+      key_types.insert(
+          key_types.end(), argument_types.begin(), argument_types.end());
+      // TODO(jianqiao): estimated_num_entries is quite inaccurate for
+      // estimating the number of entries in the distinctify hash table.
+      // We need to estimate for each distinct aggregation an
+      // estimated_num_distinct_keys value during query optimization.
+      distinctify_hashtables_.emplace_back(
+          AggregationStateHashTableFactory::CreateResizable(
+              *distinctify_hash_table_impl_types_it,
+              key_types,
+              estimated_num_entries,
+              {} /* handles */,
+              storage_manager));
+      ++distinctify_hash_table_impl_types_it;
+    } else {
+      distinctify_hashtables_.emplace_back(nullptr);
+    }
+  }
+
+  if (!group_by_key_ids_.empty()) {
+    // Aggregation with GROUP BY: create the hash table (pool).
+    if (is_aggregate_collision_free_) {
+      collision_free_hashtable_.reset(
+          AggregationStateHashTableFactory::CreateResizable(
+              hash_table_impl_type,
+              group_by_types_,
+              estimated_num_entries,
+              group_by_handles,
+              storage_manager));
+    } else if (is_aggregate_partitioned_) {
+      partitioned_group_by_hashtable_pool_.reset(
+          new PartitionedHashTablePool(estimated_num_entries,
+                                       FLAGS_num_aggregation_partitions,
+                                       hash_table_impl_type,
+                                       group_by_types_,
+                                       group_by_handles,
+                                       storage_manager));
+    } else {
+      group_by_hashtable_pool_.reset(
+          new HashTablePool(estimated_num_entries,
+                            hash_table_impl_type,
+                            group_by_types_,
+                            group_by_handles,
+                            storage_manager));
     }
   }
 }
@@ -269,7 +269,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
         proto.group_by_expressions(group_by_idx), database));
   }
 
-  unique_ptr<Predicate> predicate;
+  std::unique_ptr<Predicate> predicate;
   if (proto.has_predicate()) {
     predicate.reset(
         PredicateFactory::ReconstructFromProto(proto.predicate(), database));
@@ -353,153 +353,210 @@ bool AggregationOperationState::ProtoIsValid(
   return true;
 }
 
-void AggregationOperationState::aggregateBlock(const block_id input_block,
-                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
-  if (group_by_list_.empty()) {
-    aggregateBlockSingleState(input_block);
-  } else {
-    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+bool AggregationOperationState::checkAggregatePartitioned(
+    const std::size_t estimated_num_groups,
+    const std::vector<bool> &is_distinct,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const std::vector<const AggregateFunction *> &aggregate_functions) const {
+  // If there's no aggregation, return false.
+  if (aggregate_functions.empty()) {
+    return false;
+  }
+  // Check if there's a distinct operation involved in any aggregate, if so
+  // the aggregate can't be partitioned.
+  for (auto distinct : is_distinct) {
+    if (distinct) {
+      return false;
+    }
+  }
+  // There's no distinct aggregation involved, Check if there's at least one
+  // GROUP BY operation.
+  if (group_by.empty()) {
+    return false;
+  }
+
+  // Currently we require that all the group-by keys are ScalarAttributes for
+  // the convenient of implementing copy elision.
+  // TODO(jianqiao): relax this requirement.
+  for (const auto &group_by_element : group_by) {
+    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
+      return false;
+    }
   }
+
+  // There are GROUP BYs without DISTINCT. Check if the estimated number of
+  // groups is large enough to warrant a partitioned aggregation.
+  return estimated_num_groups >
+         static_cast<std::size_t>(
+             FLAGS_partition_aggregation_num_groups_threshold);
+  return false;
 }
 
-void AggregationOperationState::finalizeAggregate(
-    InsertDestination *output_destination) {
-  if (group_by_list_.empty()) {
-    finalizeSingleState(output_destination);
+std::size_t AggregationOperationState::getNumInitializationPartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->getNumInitializationPartitions();
   } else {
-    finalizeHashTable(output_destination);
+    return 0u;
   }
 }
 
-void AggregationOperationState::mergeSingleState(
-    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
-  DEBUG_ASSERT(local_state.size() == single_states_.size());
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (!is_distinct_[agg_idx]) {
-      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
-                                     single_states_[agg_idx].get());
-    }
+std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->getNumFinalizationPartitions();
+  } else if (is_aggregate_partitioned_) {
+    return partitioned_group_by_hashtable_pool_->getNumPartitions();
+  } else  {
+    return 1u;
   }
 }
 
-void AggregationOperationState::aggregateBlockSingleState(
-    const block_id input_block) {
-  // Aggregate per-block state for each aggregate.
-  std::vector<std::unique_ptr<AggregationState>> local_state;
+void AggregationOperationState::initialize(const std::size_t partition_id) {
+  if (is_aggregate_collision_free_) {
+    static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->initialize(partition_id);
+  } else {
+    LOG(FATAL) << "AggregationOperationState::initializeState() "
+               << "is not supported by this aggregation";
+  }
+}
 
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
+  const auto &tuple_store = block->getTupleStorageSubBlock();
+  std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
+  std::unique_ptr<ValueAccessor> shared_accessor;
+  ValueAccessor *accessor = base_accessor.get();
 
+  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
+  // as the existence map for the tuples.
   std::unique_ptr<TupleIdSequence> matches;
   if (predicate_ != nullptr) {
-    std::unique_ptr<ValueAccessor> accessor(
-        block->getTupleStorageSubBlock().createValueAccessor());
-    matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
+    matches.reset(block->getMatchesForPredicate(predicate_.get()));
+    shared_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+    accessor = shared_accessor.get();
+  }
+  if (lip_filter_adaptive_prober != nullptr) {
+    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
+    shared_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+    accessor = shared_accessor.get();
   }
 
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-    // If all arguments are attributes of the input relation, elide a copy.
-    if (!arguments_as_attributes_[agg_idx].empty()) {
-      local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
+  std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
+  if (!non_trivial_expressions_.empty()) {
+    non_trivial_results.reset(new ColumnVectorsValueAccessor());
+    SubBlocksReference sub_blocks_ref(tuple_store,
+                                      block->getIndices(),
+                                      block->getIndicesConsistent());
+    for (const auto &expression : non_trivial_expressions_) {
+      non_trivial_results->addColumn(
+          expression->getAllValues(accessor, &sub_blocks_ref));
     }
-#endif
+  }
+
+  accessor->beginIterationVirtual();
+
+  ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get());
+  if (group_by_key_ids_.empty()) {
+    aggregateBlockSingleState(accessor_mux);
+  } else {
+    aggregateBlockHashTable(accessor_mux);
+  }
+}
+
+void AggregationOperationState::aggregateBlockSingleState(
+    const ValueAccessorMultiplexer &accessor_mux) {
+  // Aggregate per-block state for each aggregate.
+  std::vector<std::unique_ptr<AggregationState>> local_state;
+
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    const auto &argument_ids = argument_ids_[agg_idx];
+    const auto &handle = handles_[agg_idx];
+
+    AggregationState *state = nullptr;
     if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to put the arguments as keys
-      // directly into the (threadsafe) shared global distinctify HashTable
-      // for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               local_arguments_as_attributes,
-                               {}, /* group_by */
-                               matches.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               nullptr /* reuse_group_by_vectors */);
-      local_state.emplace_back(nullptr);
+      handle->insertValueAccessorIntoDistinctifyHashTable(
+          argument_ids,
+          {},
+          accessor_mux,
+          distinctify_hashtables_[agg_idx].get());
     } else {
-      // Call StorageBlock::aggregate() to actually do the aggregation.
-      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
-                                                arguments_[agg_idx],
-                                                local_arguments_as_attributes,
-                                                matches.get()));
+      if (argument_ids.empty()) {
+        // Special case. This is a nullary aggregate (i.e. COUNT(*)).
+        ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+        DCHECK(base_accessor != nullptr);
+        state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual());
+      } else {
+        // Have the AggregationHandle actually do the aggregation.
+        state = handle->accumulateValueAccessor(argument_ids, accessor_mux);
+      }
     }
+    local_state.emplace_back(state);
   }
 
   // Merge per-block aggregation states back with global state.
   mergeSingleState(local_state);
 }
 
-void AggregationOperationState::aggregateBlockHashTable(
-    const block_id input_block,
-    LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
-  BlockReference block(
-      storage_manager_->getBlock(input_block, input_relation_));
-
-  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
-  // as the existence map for the tuples.
-  std::unique_ptr<TupleIdSequence> matches;
-  if (predicate_ != nullptr) {
-    matches.reset(block->getMatchesForPredicate(predicate_.get()));
-  }
-  if (lip_filter_adaptive_prober != nullptr) {
-    std::unique_ptr<ValueAccessor> accessor(
-        block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
-    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
-  }
-
-  // This holds values of all the GROUP BY attributes so that the can be reused
-  // across multiple aggregates (i.e. we only pay the cost of evaluatin the
-  // GROUP BY expressions once).
-  std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
-
+void AggregationOperationState::mergeSingleState(
+    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+  DCHECK_EQ(local_state.size(), single_states_.size());
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (is_distinct_[agg_idx]) {
-      // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
-      // expression
-      // values and the aggregation arguments together as keys directly into the
-      // (threadsafe) shared global distinctify HashTable for this aggregate.
-      block->aggregateDistinct(*handles_[agg_idx],
-                               arguments_[agg_idx],
-                               nullptr, /* arguments_as_attributes */
-                               group_by_list_,
-                               matches.get(),
-                               distinctify_hashtables_[agg_idx].get(),
-                               &reuse_group_by_vectors);
+    if (!is_distinct_[agg_idx]) {
+      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+                                     single_states_[agg_idx].get());
     }
   }
+}
+
+void AggregationOperationState::mergeGroupByHashTables(
+    AggregationStateHashTableBase *src,
+    AggregationStateHashTableBase *dst) const {
+  HashTableMerger merger(static_cast<PackedPayloadHashTable *>(dst));
+  static_cast<PackedPayloadHashTable *>(src)->forEachCompositeKey(&merger);
+}
 
-  if (!is_aggregate_partitioned_) {
-    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-    // directly into the (threadsafe) shared global HashTable for this
-    // aggregate.
-    DCHECK(group_by_hashtable_pool_ != nullptr);
-    AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pool_->getHashTableFast();
-    DCHECK(agg_hash_table != nullptr);
-    block->aggregateGroupBy(arguments_,
-                            group_by_list_,
-                            matches.get(),
-                            agg_hash_table,
-                            &reuse_group_by_vectors);
-    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+void AggregationOperationState::aggregateBlockHashTable(
+    const ValueAccessorMultiplexer &accessor_mux) {
+  if (is_aggregate_collision_free_) {
+    aggregateBlockHashTableImplCollisionFree(accessor_mux);
+  } else if (is_aggregate_partitioned_) {
+    aggregateBlockHashTableImplPartitioned(accessor_mux);
   } else {
-    ColumnVectorsValueAccessor temp_result;
-    // IDs of 'arguments' as attributes in the ValueAccessor we create below.
-    std::vector<attribute_id> argument_ids;
+    aggregateBlockHashTableImplThreadPrivate(accessor_mux);
+  }
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
+    const ValueAccessorMultiplexer &accessor_mux) {
+  DCHECK(collision_free_hashtable_ != nullptr);
+
+  collision_free_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
+                                                             group_by_key_ids_,
+                                                             accessor_mux);
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
+    const ValueAccessorMultiplexer &accessor_mux) {
+  DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+
+  std::vector<attribute_id> group_by_key_ids;
+  for (const MultiSourceAttributeId &key_id : group_by_key_ids_) {
+    DCHECK(key_id.source == ValueAccessorSource::kBase);
+    group_by_key_ids.emplace_back(key_id.attr_id);
+  }
 
-    // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
-    std::vector<attribute_id> key_ids;
+  InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+      accessor_mux.getBaseAccessor(),
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    // TODO(jianqiao): handle the situation when keys in non_trivial_results
     const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
-    block->aggregateGroupByPartitioned(
-        arguments_,
-        group_by_list_,
-        matches.get(),
-        num_partitions,
-        &temp_result,
-        &argument_ids,
-        &key_ids,
-        &reuse_group_by_vectors);
+
     // Compute the partitions for the tuple formed by group by values.
     std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
     partition_membership.resize(num_partitions);
@@ -507,32 +564,74 @@ void AggregationOperationState::aggregateBlockHashTable(
     // Create a tuple-id sequence for each partition.
     for (std::size_t partition = 0; partition < num_partitions; ++partition) {
       partition_membership[partition].reset(
-          new TupleIdSequence(temp_result.getEndPosition()));
+          new TupleIdSequence(accessor->getEndPosition()));
     }
 
     // Iterate over ValueAccessor for each tuple,
     // set a bit in the appropriate TupleIdSequence.
-    temp_result.beginIteration();
-    while (temp_result.next()) {
+    while (accessor->next()) {
       // We need a unique_ptr because getTupleWithAttributes() uses "new".
-      std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+      std::unique_ptr<Tuple> curr_tuple(
+          accessor->getTupleWithAttributes(group_by_key_ids));
       const std::size_t curr_tuple_partition_id =
           curr_tuple->getTupleHash() % num_partitions;
       partition_membership[curr_tuple_partition_id]->set(
-          temp_result.getCurrentPosition(), true);
+          accessor->getCurrentPosition(), true);
     }
-    // For each partition, create an adapter around Value Accessor and
-    // TupleIdSequence.
-    std::vector<std::unique_ptr<
-        TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
-    adapter.resize(num_partitions);
+
+    // Aggregate each partition.
     for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-      adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
-          *(partition_membership)[partition]));
+      std::unique_ptr<ValueAccessor> base_adapter(
+          accessor->createSharedTupleIdSequenceAdapter(
+              *partition_membership[partition]));
+
+      std::unique_ptr<ValueAccessor> derived_adapter;
+      if (accessor_mux.getDerivedAccessor() != nullptr) {
+        derived_adapter.reset(
+            accessor_mux.getDerivedAccessor()->createSharedTupleIdSequenceAdapterVirtual(
+                *partition_membership[partition]));
+      }
+
+      ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get());
       partitioned_group_by_hashtable_pool_->getHashTable(partition)
-          ->upsertValueAccessorCompositeKeyFast(
-              argument_ids, adapter[partition].get(), key_ids, true);
+          ->upsertValueAccessorCompositeKey(argument_ids_,
+                                            group_by_key_ids_,
+                                            local_mux);
     }
+  });
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
+    const ValueAccessorMultiplexer &accessor_mux) {
+  DCHECK(group_by_hashtable_pool_ != nullptr);
+
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    if (is_distinct_[agg_idx]) {
+      handles_[agg_idx]->insertValueAccessorIntoDistinctifyHashTable(
+          argument_ids_[agg_idx],
+          group_by_key_ids_,
+          accessor_mux,
+          distinctify_hashtables_[agg_idx].get());
+    }
+  }
+
+  AggregationStateHashTableBase *agg_hash_table =
+      group_by_hashtable_pool_->getHashTable();
+
+  agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
+                                                  group_by_key_ids_,
+                                                  accessor_mux);
+  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+}
+
+void AggregationOperationState::finalizeAggregate(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  if (group_by_key_ids_.empty()) {
+    DCHECK_EQ(0u, partition_id);
+    finalizeSingleState(output_destination);
+  } else {
+    finalizeHashTable(partition_id, output_destination);
   }
 }
 
@@ -556,80 +655,83 @@ void AggregationOperationState::finalizeSingleState(
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
-void AggregationOperationState::mergeGroupByHashTables(
-    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
-  HashTableMergerFast merger(dst);
-  (static_cast<FastHashTable<true, false, true, false> *>(src))
-      ->forEachCompositeKeyFast(&merger);
+void AggregationOperationState::finalizeHashTable(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  if (is_aggregate_collision_free_) {
+    finalizeHashTableImplCollisionFree(partition_id, output_destination);
+  } else if (is_aggregate_partitioned_) {
+    finalizeHashTableImplPartitioned(partition_id, output_destination);
+  } else {
+    DCHECK_EQ(0u, partition_id);
+    finalizeHashTableImplThreadPrivate(output_destination);
+  }
 }
 
-void AggregationOperationState::finalizeHashTable(
+void AggregationOperationState::finalizeHashTableImplCollisionFree(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  std::vector<std::unique_ptr<ColumnVector>> final_values;
+  CollisionFreeVectorTable *hash_table =
+      static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
+
+  const std::size_t max_length =
+      hash_table->getNumTuplesInFinalizationPartition(partition_id);
+  ColumnVectorsValueAccessor complete_result;
+
+  DCHECK_EQ(1u, group_by_types_.size());
+  const Type *key_type = group_by_types_.front();
+  DCHECK(NativeColumnVector::UsableForType(*key_type));
+
+  std::unique_ptr<NativeColumnVector> key_cv(
+      std::make_unique<NativeColumnVector>(*key_type, max_length));
+  hash_table->finalizeKey(partition_id, key_cv.get());
+  complete_result.addColumn(key_cv.release());
+
+  for (std::size_t i = 0; i < handles_.size(); ++i) {
+    const Type *result_type = handles_[i]->getResultType();
+    DCHECK(NativeColumnVector::UsableForType(*result_type));
+
+    std::unique_ptr<NativeColumnVector> result_cv(
+        std::make_unique<NativeColumnVector>(*result_type, max_length));
+    hash_table->finalizeState(partition_id, i, result_cv.get());
+    complete_result.addColumn(result_cv.release());
+  }
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
+void AggregationOperationState::finalizeHashTableImplPartitioned(
+    const std::size_t partition_id,
     InsertDestination *output_destination) {
+  PackedPayloadHashTable *hash_table =
+      static_cast<PackedPayloadHashTable *>(
+          partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
+
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
 
-  // TODO(harshad) - The merge phase may be slower when each hash table contains
-  // large number of entries. We should find ways in which we can perform a
-  // parallel merge.
+  if (handles_.empty()) {
+    const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+                                                 const std::uint8_t *dumb_placeholder) -> void {
+      group_by_keys.emplace_back(std::move(group_by_key));
+    };
 
-  // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
-  // e.g. Keep merging entries from smaller hash tables to larger.
-
-  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  if (hash_tables->size() > 1) {
-    for (int hash_table_index = 0;
-         hash_table_index < static_cast<int>(hash_tables->size() - 1);
-         ++hash_table_index) {
-      // Merge each hash table to the last hash table.
-      mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
-                             hash_tables->back().get());
-    }
+    hash_table->forEachCompositeKey(&keys_retriever);
   }
 
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (is_distinct_[agg_idx]) {
-      DCHECK(group_by_hashtable_pool_ != nullptr);
-      auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      DCHECK(hash_tables != nullptr);
-      if (hash_tables->empty()) {
-        // We may have a case where hash_tables is empty, e.g. no input blocks.
-        // However for aggregateOnDistinctifyHashTableForGroupBy to work
-        // correctly, we should create an empty group by hash table.
-        AggregationStateHashTableBase *new_hash_table =
-            group_by_hashtable_pool_->getHashTableFast();
-        group_by_hashtable_pool_->returnHashTable(new_hash_table);
-        hash_tables = group_by_hashtable_pool_->getAllHashTables();
-      }
-      DCHECK(hash_tables->back() != nullptr);
-      AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-      DCHECK(agg_hash_table != nullptr);
-      handles_[agg_idx]->allowUpdate();
-      handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
-          *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
-    }
-
-    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    DCHECK(hash_tables != nullptr);
-    if (hash_tables->empty()) {
-      // We may have a case where hash_tables is empty, e.g. no input blocks.
-      // However for aggregateOnDistinctifyHashTableForGroupBy to work
-      // correctly, we should create an empty group by hash table.
-      AggregationStateHashTableBase *new_hash_table =
-          group_by_hashtable_pool_->getHashTableFast();
-      group_by_hashtable_pool_->returnHashTable(new_hash_table);
-      hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    }
-    AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
-    DCHECK(agg_hash_table != nullptr);
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *agg_hash_table, &group_by_keys, agg_idx);
+        *hash_table, agg_idx, &group_by_keys);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
   }
+  hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -640,23 +742,20 @@ void AggregationOperationState::finalizeHashTable(
   // in a single HashTable.
   std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
   std::size_t group_by_element_idx = 0;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    const Type &group_by_type = group_by_element->getType();
-    if (NativeColumnVector::UsableForType(group_by_type)) {
+  for (const Type *group_by_type : group_by_types_) {
+    if (NativeColumnVector::UsableForType(*group_by_type)) {
       NativeColumnVector *element_cv =
-          new NativeColumnVector(group_by_type, group_by_keys.size());
+          new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
       }
     } else {
       IndirectColumnVector *element_cv =
-          new IndirectColumnVector(group_by_type, group_by_keys.size());
+          new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;
@@ -676,42 +775,64 @@ void AggregationOperationState::finalizeHashTable(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::destroyAggregationHashTablePayload() {
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
-      nullptr;
-  if (!is_aggregate_partitioned_) {
-    if (group_by_hashtable_pool_ != nullptr) {
-      all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    }
-  } else {
-    if (partitioned_group_by_hashtable_pool_ != nullptr) {
-      all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
-    }
+void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+    InsertDestination *output_destination) {
+  // TODO(harshad) - The merge phase may be slower when each hash table contains
+  // large number of entries. We should find ways in which we can perform a
+  // parallel merge.
+
+  // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+  // e.g. Keep merging entries from smaller hash tables to larger.
+
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+  DCHECK(hash_tables != nullptr);
+  if (hash_tables->empty()) {
+    return;
   }
-  if (all_hash_tables != nullptr) {
-    for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
-      (*all_hash_tables)[ht_index]->destroyPayload();
-    }
+
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
+      hash_tables->back().release());
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
+    hash_table->destroyPayload();
   }
-}
 
-void AggregationOperationState::finalizeAggregatePartitioned(
-    const std::size_t partition_id, InsertDestination *output_destination) {
+  PackedPayloadHashTable *final_hash_table =
+      static_cast<PackedPayloadHashTable *>(final_hash_table_ptr.get());
+
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
 
+  if (handles_.empty()) {
+    const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+                                                 const std::uint8_t *dumb_placeholder) -> void {
+      group_by_keys.emplace_back(std::move(group_by_key));
+    };
+
+    final_hash_table->forEachCompositeKey(&keys_retriever);
+  }
+
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    AggregationStateHashTableBase *hash_table =
-        partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
-    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *hash_table, &group_by_keys, agg_idx);
-    if (agg_result_col != nullptr) {
-      final_values.emplace_back(agg_result_col);
+    if (is_distinct_[agg_idx]) {
+      handles_[agg_idx]->allowUpdate();
+      handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
+          *distinctify_hashtables_[agg_idx], agg_idx, final_hash_table);
     }
+
+    ColumnVector *agg_result_col =
+        handles_[agg_idx]->finalizeHashTable(
+            *final_hash_table, agg_idx, &group_by_keys);
+    DCHECK(agg_result_col != nullptr);
+
+    final_values.emplace_back(agg_result_col);
   }
+  final_hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -722,19 +843,22 @@ void AggregationOperationState::finalizeAggregatePartitioned(
   // in a single HashTable.
   std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
   std::size_t group_by_element_idx = 0;
-  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
-    const Type &group_by_type = group_by_element->getType();
-    if (NativeColumnVector::UsableForType(group_by_type)) {
-      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+  for (const Type *group_by_type : group_by_types_) {
+    if (NativeColumnVector::UsableForType(*group_by_type)) {
+      NativeColumnVector *element_cv =
+          new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     } else {
-      IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+      IndirectColumnVector *element_cv =
+          new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 591e3a1..13ee377 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -24,31 +24,27 @@
 #include <memory>
 #include <vector>
 
-#include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
-#include "storage/AggregationOperationState.pb.h"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
 #include "storage/PartitionedHashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "utility/Macros.hpp"
 
-#include "gflags/gflags.h"
-
 namespace quickstep {
 
+namespace serialization { class AggregationOperationState; }
+
 class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
 class LIPFilterAdaptiveProber;
 class StorageManager;
-
-DECLARE_int32(num_aggregation_partitions);
-DECLARE_int32(partition_aggregation_num_groups_threshold);
+class Type;
 
 /** \addtogroup Storage
  *  @{
@@ -156,6 +152,29 @@ class AggregationOperationState {
       const CatalogDatabaseLite &database);
 
   /**
+   * @brief Get the number of partitions to be used for initializing the
+   *        aggregation.
+   *
+   * @return The number of partitions to be used for initializing the aggregation.
+   **/
+  std::size_t getNumInitializationPartitions() const;
+
+  /**
+   * @brief Get the number of partitions to be used for finalizing the
+   *        aggregation.
+   *
+   * @return The number of partitions to be used for finalizing the aggregation.
+   **/
+  std::size_t getNumFinalizationPartitions() const;
+
+  /**
+   * @brief Initialize the specified partition of this aggregation.
+   *
+   * @param partition_id ID of the partition to be initialized.
+   */
+  void initialize(const std::size_t partition_id);
+
+  /**
    * @brief Compute aggregates on the tuples of the given storage block,
    *        updating the running state maintained by this
    *        AggregationOperationState.
@@ -166,127 +185,95 @@ class AggregationOperationState {
    *        the block.
    **/
   void aggregateBlock(const block_id input_block,
-                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr);
 
   /**
    * @brief Generate the final results for the aggregates managed by this
    *        AggregationOperationState and write them out to StorageBlock(s).
    *
+   * @param partition_id The partition id of this finalize operation.
    * @param output_destination An InsertDestination where the finalized output
    *        tuple(s) from this aggregate are to be written.
    **/
-  void finalizeAggregate(InsertDestination *output_destination);
-
-  /**
-   * @brief Destroy the payloads in the aggregation hash tables.
-   **/
-  void destroyAggregationHashTablePayload();
-
-  /**
-   * @brief Generate the final results for the aggregates managed by this
-   *        AggregationOperationState and write them out to StorageBlock(s).
-   *        In this implementation, each thread picks a hash table belonging to
-   *        a partition and writes its values to StorageBlock(s). There is no
-   *        need to merge multiple hash tables in one, because there is no
-   *        overlap in the keys across two hash tables.
-   *
-   * @param partition_id The ID of the partition for which finalize is being
-   *        performed.
-   * @param output_destination An InsertDestination where the finalized output
-   *        tuple(s) from this aggregate are to be written.
-   **/
-  void finalizeAggregatePartitioned(
-      const std::size_t partition_id, InsertDestination *output_destination);
-
-  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
-                                     AggregationStateHashTableBase *dst);
+  void finalizeAggregate(const std::size_t partition_id,
+                         InsertDestination *output_destination);
 
-  bool isAggregatePartitioned() const {
-    return is_aggregate_partitioned_;
-  }
+ private:
+  // Check whether partitioned aggregation can be applied.
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<const AggregateFunction *> &aggregate_functions) const;
 
-  /**
-   * @brief Get the number of partitions to be used for the aggregation.
-   *        For non-partitioned aggregations, we return 1.
-   **/
-  std::size_t getNumPartitions() const {
-    return is_aggregate_partitioned_
-               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
-               : 1;
-  }
+  // Aggregate on input block.
+  void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux);
 
-  int dflag;
+  void aggregateBlockHashTable(const ValueAccessorMultiplexer &accessor_mux);
 
- private:
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
   void mergeSingleState(
       const std::vector<std::unique_ptr<AggregationState>> &local_state);
 
-  // Aggregate on input block.
-  void aggregateBlockSingleState(const block_id input_block);
-  void aggregateBlockHashTable(const block_id input_block,
-                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
+  void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+                              AggregationStateHashTableBase *dst) const;
 
+  // Finalize the aggregation results into output_destination.
   void finalizeSingleState(InsertDestination *output_destination);
-  void finalizeHashTable(InsertDestination *output_destination);
 
-  bool checkAggregatePartitioned(
-      const std::size_t estimated_num_groups,
-      const std::vector<bool> &is_distinct,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const std::vector<const AggregateFunction *> &aggregate_functions) const {
-    // If there's no aggregation, return false.
-    if (aggregate_functions.empty()) {
-      return false;
-    }
-    // Check if there's a distinct operation involved in any aggregate, if so
-    // the aggregate can't be partitioned.
-    for (auto distinct : is_distinct) {
-      if (distinct) {
-        return false;
-      }
-    }
-    // There's no distinct aggregation involved, Check if there's at least one
-    // GROUP BY operation.
-    if (group_by.empty()) {
-      return false;
-    }
-    // There are GROUP BYs without DISTINCT. Check if the estimated number of
-    // groups is large enough to warrant a partitioned aggregation.
-    return estimated_num_groups >
-           static_cast<std::size_t>(
-               FLAGS_partition_aggregation_num_groups_threshold);
-  }
+  void finalizeHashTable(const std::size_t partition_id,
+                         InsertDestination *output_destination);
+
+  // Specialized implementations for aggregateBlockHashTable.
+  void aggregateBlockHashTableImplCollisionFree(
+      const ValueAccessorMultiplexer &accessor_mux);
+
+  void aggregateBlockHashTableImplPartitioned(
+      const ValueAccessorMultiplexer &accessor_mux);
+
+  void aggregateBlockHashTableImplThreadPrivate(
+      const ValueAccessorMultiplexer &accessor_mux);
+
+  // Specialized implementations for finalizeHashTable.
+  void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
+                                          InsertDestination *output_destination);
+
+  void finalizeHashTableImplPartitioned(const std::size_t partition_id,
+                                        InsertDestination *output_destination);
+
+  void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
 
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
 
+  // Whether the aggregation is collision free or not.
+  bool is_aggregate_collision_free_;
+
   // Whether the aggregation is partitioned or not.
-  const bool is_aggregate_partitioned_;
+  bool is_aggregate_partitioned_;
 
   std::unique_ptr<const Predicate> predicate_;
-  std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
   // Each individual aggregate in this operation has an AggregationHandle and
-  // some number of Scalar arguments.
-  std::vector<AggregationHandle *> handles_;
-  std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
+  // zero (indicated by -1) or one argument.
+  std::vector<std::unique_ptr<AggregationHandle>> handles_;
 
   // For each aggregate, whether DISTINCT should be applied to the aggregate's
   // arguments.
   std::vector<bool> is_distinct_;
 
-  // Hash table for obtaining distinct (i.e. unique) arguments.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-      distinctify_hashtables_;
+  // Non-trivial group-by/argument expressions that need to be evaluated.
+  std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
+
+  std::vector<MultiSourceAttributeId> group_by_key_ids_;
+  std::vector<std::vector<MultiSourceAttributeId>> argument_ids_;
 
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-  // If all an aggregate's argument expressions are simply attributes in
-  // 'input_relation_', then this caches the attribute IDs of those arguments.
-  std::vector<std::vector<attribute_id>> arguments_as_attributes_;
-#endif
+  std::vector<const Type *> group_by_types_;
+
+  // Hash table for obtaining distinct (i.e. unique) arguments.
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
 
   // Per-aggregate global states for aggregation without GROUP BY.
   std::vector<std::unique_ptr<AggregationState>> single_states_;
@@ -295,14 +282,15 @@ class AggregationOperationState {
   //
   // TODO(shoban): We should ideally store the aggregation state together in one
   // hash table to prevent multiple lookups.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-      group_by_hashtables_;
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
 
   // A vector of group by hash table pools.
   std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
 
   std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
 
+  std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index a44c3a7..293be17 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,6 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
               bitweaving/BitWeavingVIndexSubBlock.hpp)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+add_library(quickstep_storage_CollisionFreeVectorTable
+            CollisionFreeVectorTable.cpp
+            CollisionFreeVectorTable.hpp)
 add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
 add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
 add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -194,9 +197,6 @@ if (ENABLE_DISTRIBUTED)
 endif()
 
 add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp)
-add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp)
-add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp)
-add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp)
 add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp)
 if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
   add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp)
@@ -226,6 +226,7 @@ add_library(quickstep_storage_InsertDestination_proto
 add_library(quickstep_storage_LinearOpenAddressingHashTable
             ../empty_src.cpp
             LinearOpenAddressingHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadHashTable PackedPayloadHashTable.cpp PackedPayloadHashTable.hpp)
 add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
@@ -253,6 +254,7 @@ add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.h
 add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
 add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp)
+add_library(quickstep_storage_ValueAccessorMultiplexer ../empty_src.cpp ValueAccessorMultiplexer.hpp)
 add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp)
 add_library(quickstep_storage_WindowAggregationOperationState
             WindowAggregationOperationState.hpp
@@ -272,22 +274,25 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunctionFactory
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_expressions_aggregation_AggregationHandleDistinct
-                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_storage_AggregationOperationState_proto
-                      quickstep_storage_HashTable
+                      quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
                       quickstep_storage_PartitionedHashTablePool
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_SubBlocksReference
                       quickstep_storage_TupleIdSequence
+                      quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
+                      quickstep_storage_ValueAccessorUtil
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
@@ -430,6 +435,24 @@ if(QUICKSTEP_HAVE_BITWEAVING)
                         quickstep_utility_Macros)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+target_link_libraries(quickstep_storage_CollisionFreeVectorTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ColumnStoreUtil
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
@@ -627,52 +650,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy
                       quickstep_threading_SpinMutex
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTable
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_storage_TupleReference
-                      quickstep_storage_ValueAccessor
-                      quickstep_storage_ValueAccessorUtil
-                      quickstep_threading_SpinMutex
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_HashPair
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastHashTableFactory
-                      glog
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTable_proto
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
-                      quickstep_storage_LinearOpenAddressingHashTable
-                      quickstep_storage_SeparateChainingHashTable
-                      quickstep_storage_SimpleScalarSeparateChainingHashTable
-                      quickstep_storage_TupleReference
-                      quickstep_types_TypeFactory
-                      quickstep_utility_Macros)
-target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_HashTable
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableKeyManager
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
-                      quickstep_storage_StorageManager
-                      quickstep_threading_SpinSharedMutex
-                      quickstep_types_Type
-                      quickstep_types_TypedValue
-                      quickstep_utility_Alignment
-                      quickstep_utility_Macros
-                      quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_FileManager
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
@@ -731,16 +708,19 @@ target_link_libraries(quickstep_storage_HashTable
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTableBase
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTable_proto
                       quickstep_types_Type_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_storage_HashTableFactory
                       glog
+                      quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
                       quickstep_storage_LinearOpenAddressingHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
@@ -759,13 +739,10 @@ target_link_libraries(quickstep_storage_HashTableKeyManager
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTablePool
                       glog
-                      quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
                       quickstep_threading_SpinMutex
-                      quickstep_utility_Macros
-                      quickstep_utility_StringUtil)
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_IndexSubBlock
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_predicate_PredicateCost
@@ -820,14 +797,32 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_utility_Alignment
                       quickstep_utility_Macros
                       quickstep_utility_PrimeNumber)
-target_link_libraries(quickstep_storage_PartitionedHashTablePool
-                      glog
+target_link_libraries(quickstep_storage_PackedPayloadHashTable
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableKeyManager
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_threading_SpinMutex
+                      quickstep_threading_SpinSharedMutex
+                      quickstep_types_Type
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_Alignment
+                      quickstep_utility_HashPair
                       quickstep_utility_Macros
-                      quickstep_utility_StringUtil)
+                      quickstep_utility_PrimeNumber)
+target_link_libraries(quickstep_storage_PartitionedHashTablePool
+                      glog
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_PreloaderThread
                       glog
                       quickstep_catalog_CatalogDatabase
@@ -936,7 +931,6 @@ target_link_libraries(quickstep_storage_StorageBlock
                       glog
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_storage_BasicColumnStoreTupleStorageSubBlock
@@ -945,7 +939,6 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
                       quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock
                       quickstep_storage_CountedReference
-                      quickstep_storage_HashTableBase
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_SMAIndexSubBlock
@@ -1068,6 +1061,10 @@ target_link_libraries(quickstep_storage_ValueAccessor
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_ValueAccessorMultiplexer
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ValueAccessorUtil
                       glog
                       quickstep_storage_BasicColumnStoreValueAccessor
@@ -1115,6 +1112,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
+                      quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_ColumnStoreUtil
                       quickstep_storage_CompressedBlockBuilder
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1125,9 +1123,6 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_CompressedTupleStorageSubBlock
                       quickstep_storage_CountedReference
                       quickstep_storage_EvictionPolicy
-                      quickstep_storage_FastHashTable
-                      quickstep_storage_FastHashTableFactory
-                      quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_FileManager
                       quickstep_storage_FileManagerLocal
                       quickstep_storage_Flags
@@ -1144,6 +1139,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PartitionedHashTablePool
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable
@@ -1166,6 +1162,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
                       quickstep_storage_WindowAggregationOperationState
                       quickstep_storage_WindowAggregationOperationState_proto)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
new file mode 100644
index 0000000..d836014
--- /dev/null
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeVectorTable.hpp"
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+CollisionFreeVectorTable::CollisionFreeVectorTable(
+    const Type *key_type,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_type_(key_type),
+      num_entries_(num_entries),
+      num_handles_(handles.size()),
+      handles_(handles),
+      num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
+      storage_manager_(storage_manager) {
+  DCHECK_GT(num_entries, 0u);
+
+  std::size_t required_memory = 0;
+  const std::size_t existence_map_offset = 0;
+  std::vector<std::size_t> state_offsets;
+
+  required_memory += CacheLineAlignedBytes(
+      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::atomic<std::size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        DCHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(std::atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "Not implemented";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Not implemented";
+    }
+
+    state_offsets.emplace_back(required_memory);
+    required_memory += CacheLineAlignedBytes(state_size * num_entries);
+  }
+
+  const std::size_t num_storage_slots =
+      storage_manager_->SlotsNeededForBytes(required_memory);
+
+  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+  blob_ = storage_manager_->getBlobMutable(blob_id);
+
+  void *memory_start = blob_->getMemoryMutable();
+  existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
+      reinterpret_cast<char *>(memory_start) + existence_map_offset,
+      num_entries,
+      false /* initialize */));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    // Columnwise layout.
+    vec_tables_.emplace_back(
+        reinterpret_cast<char *>(memory_start) + state_offsets.at(i));
+  }
+
+  memory_size_ = required_memory;
+  num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_);
+}
+
+CollisionFreeVectorTable::~CollisionFreeVectorTable() {
+  const block_id blob_id = blob_->getID();
+  blob_.release();
+  storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeVectorTable::destroyPayload() {
+}
+
+bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey(
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_ids,
+    const ValueAccessorMultiplexer &accessor_mux) {
+  DCHECK_EQ(1u, key_ids.size());
+
+  if (handles_.empty()) {
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        accessor_mux.getValueAccessorBySource(key_ids.front().source),
+        [&key_ids, this](auto *accessor) -> void {  // NOLINT(build/c++11)
+      this->upsertValueAccessorKeyOnlyHelper(key_type_->isNullable(),
+                                             key_type_,
+                                             key_ids.front().attr_id,
+                                             accessor);
+    });
+    return true;
+  }
+
+  DCHECK(accessor_mux.getDerivedAccessor() == nullptr ||
+         accessor_mux.getDerivedAccessor()->getImplementationType()
+             == ValueAccessor::Implementation::kColumnVectors);
+
+  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accesor =
+      static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+
+  // Dispatch to specialized implementations to achieve maximum performance.
+  InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+      base_accessor,
+      [&argument_ids, &key_ids, &derived_accesor, this](auto *accessor) -> void {  // NOLINT(build/c++11)
+    const ValueAccessorSource key_source = key_ids.front().source;
+    const attribute_id key_id = key_ids.front().attr_id;
+    const bool is_key_nullable = key_type_->isNullable();
+
+    for (std::size_t i = 0; i < num_handles_; ++i) {
+      DCHECK_LE(argument_ids[i].size(), 1u);
+
+      const AggregationHandle *handle = handles_[i];
+      const auto &argument_types = handle->getArgumentTypes();
+      const auto &argument_ids_i = argument_ids[i];
+
+      ValueAccessorSource argument_source;
+      attribute_id argument_id;
+      const Type *argument_type;
+      bool is_argument_nullable;
+
+      if (argument_ids_i.empty()) {
+        argument_source = ValueAccessorSource::kInvalid;
+        argument_id = kInvalidAttributeID;
+
+        DCHECK(argument_types.empty());
+        argument_type = nullptr;
+        is_argument_nullable = false;
+      } else {
+        DCHECK_EQ(1u, argument_ids_i.size());
+        argument_source = argument_ids_i.front().source;
+        argument_id = argument_ids_i.front().attr_id;
+
+        DCHECK_EQ(1u, argument_types.size());
+        argument_type = argument_types.front();
+        is_argument_nullable = argument_type->isNullable();
+      }
+
+      if (key_source == ValueAccessorSource::kBase) {
+        if (argument_source == ValueAccessorSource::kBase) {
+          this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                         is_argument_nullable,
+                                                         key_type_,
+                                                         argument_type,
+                                                         handle->getAggregationID(),
+                                                         key_id,
+                                                         argument_id,
+                                                         vec_tables_[i],
+                                                         accessor,
+                                                         accessor);
+        } else {
+          this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                        is_argument_nullable,
+                                                        key_type_,
+                                                        argument_type,
+                                                        handle->getAggregationID(),
+                                                        key_id,
+                                                        argument_id,
+                                                        vec_tables_[i],
+                                                        accessor,
+                                                        derived_accesor);
+        }
+      } else {
+        if (argument_source == ValueAccessorSource::kBase) {
+          this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                        is_argument_nullable,
+                                                        key_type_,
+                                                        argument_type,
+                                                        handle->getAggregationID(),
+                                                        key_id,
+                                                        argument_id,
+                                                        vec_tables_[i],
+                                                        derived_accesor,
+                                                        accessor);
+        } else {
+          this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                         is_argument_nullable,
+                                                         key_type_,
+                                                         argument_type,
+                                                         handle->getAggregationID(),
+                                                         key_id,
+                                                         argument_id,
+                                                         vec_tables_[i],
+                                                         derived_accesor,
+                                                         derived_accesor);
+        }
+      }
+    }
+  });
+  return true;
+}
+
+void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
+                                           NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  switch (key_type_->getTypeID()) {
+    case TypeID::kInt:
+      finalizeKeyInternal<int>(start_position, end_position, output_cv);
+      return;
+    case TypeID::kLong:
+      finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
+                                             const std::size_t handle_id,
+                                             NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  const AggregationHandle *handle = handles_[handle_id];
+  const auto &argument_types = handle->getArgumentTypes();
+  const Type *argument_type =
+      argument_types.empty() ? nullptr : argument_types.front();
+
+  finalizeStateDispatchHelper(handle->getAggregationID(),
+                              argument_type,
+                              vec_tables_[handle_id],
+                              start_position,
+                              end_position,
+                              output_cv);
+}
+
+}  // namespace quickstep