You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/24 05:40:34 UTC

[3/3] incubator-quickstep git commit: New aggregation design.

New aggregation design.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bc81c5b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bc81c5b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bc81c5b3

Branch: refs/heads/agg-expr
Commit: bc81c5b3fb8eb4c4bbce67be8247000e959df90e
Parents: 4be8e91
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Feb 22 13:58:08 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Feb 23 22:03:55 2017 -0600

----------------------------------------------------------------------
 expressions/aggregation/AggFunc.hpp    | 187 +++++++++
 expressions/aggregation/CMakeLists.txt |  10 +
 query_optimizer/ExecutionGenerator.cpp |   5 +-
 storage/AggregationOperationState.cpp  |  99 +----
 storage/AggregationOperationState.hpp  |   2 +-
 storage/CMakeLists.txt                 |   5 +-
 storage/CollisionFreeVectorTable.cpp   | 478 +++++++++++++++-------
 storage/CollisionFreeVectorTable.hpp   | 587 ++++++++--------------------
 storage/PackedPayloadHashTable.cpp     |   4 +
 storage/PackedPayloadHashTable.hpp     |  60 +++
 utility/BoolVector.hpp                 | 226 +++++++++++
 utility/CMakeLists.txt                 |   5 +
 12 files changed, 1007 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/expressions/aggregation/AggFunc.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggFunc.hpp b/expressions/aggregation/AggFunc.hpp
new file mode 100644
index 0000000..31f385e
--- /dev/null
+++ b/expressions/aggregation/AggFunc.hpp
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_
+#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <type_traits>
+
+#include "expressions/aggregation/AggregationID.hpp"
+#include "utility/Macros.hpp"
+#include "types/IntType.hpp"
+#include "types/LongType.hpp"
+#include "types/FloatType.hpp"
+#include "types/DoubleType.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVector;
+class StorageManager;
+class Type;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+struct InvalidType {};
+
+template <typename T, typename U>
+struct is_different : std::true_type {};
+
+template <typename T>
+struct is_different<T, T> : std::false_type {};
+
+class Sum {
+ public:
+  Sum() {}
+
+  template <typename ArgType>
+  struct AggState {
+    typedef InvalidType T;
+    typedef InvalidType AtomicT;
+    typedef InvalidType ResultT;
+  };
+
+  template <typename ArgType>
+  struct HasAtomicImpl :
+      is_different<InvalidType,
+                   typename AggState<ArgType>::AtomicT> {};
+
+  template <typename ArgType>
+  inline static void MergeArgAtomic(const typename ArgType::cpptype &value,
+                                    typename AggState<ArgType>::AtomicT *state) {
+    LOG(FATAL) << "Not implemented";
+  }
+
+  template <typename ArgType>
+  inline static void FinalizeAtomic(const typename AggState<ArgType>::AtomicT &state,
+                                    typename AggState<ArgType>::ResultT *result) {
+    LOG(FATAL) << "Not implemented";
+  }
+
+  template <typename ArgType>
+  inline static void MergeArgUnsafe(const typename ArgType::cpptype &value,
+                                    typename AggState<ArgType>::T *state) {
+    *state += value;
+  }
+
+  template <typename ArgType>
+  inline static void FinalizeUnsafe(const typename AggState<ArgType>::T &state,
+                                    typename AggState<ArgType>::ResultT *result) {
+    *result = state;
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Sum);
+};
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for IntType
+template <>
+struct Sum::AggState<IntType> {
+  typedef std::int64_t T;
+  typedef std::atomic<std::int64_t> AtomicT;
+  typedef std::int64_t ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<IntType>(const IntType::cpptype &value,
+                                         AggState<IntType>::AtomicT *state) {
+  state->fetch_add(value, std::memory_order_relaxed);
+}
+
+template <>
+inline void Sum::FinalizeAtomic<IntType>(const AggState<IntType>::AtomicT &state,
+                                         AggState<IntType>::ResultT *result) {
+  *result = state.load(std::memory_order_relaxed);
+}
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for LongType
+template <>
+struct Sum::AggState<LongType> {
+  typedef std::int64_t T;
+  typedef std::atomic<std::int64_t> AtomicT;
+  typedef std::int64_t ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<LongType>(const LongType::cpptype &value,
+                                          AggState<LongType>::AtomicT *state) {
+  state->fetch_add(value, std::memory_order_relaxed);
+}
+
+template <>
+inline void Sum::FinalizeAtomic<LongType>(const AggState<LongType>::AtomicT &state,
+                                          AggState<LongType>::ResultT *result) {
+  *result = state.load(std::memory_order_relaxed);
+}
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for FloatType
+template <>
+struct Sum::AggState<FloatType> {
+  typedef double T;
+  typedef std::atomic<double> AtomicT;
+  typedef double ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<FloatType>(const FloatType::cpptype &value,
+                                           AggState<FloatType>::AtomicT *state) {
+  AggState<FloatType>::T state_val = state->load(std::memory_order_relaxed);
+  while (!state->compare_exchange_weak(state_val, state_val + value)) {}
+}
+
+template <>
+inline void Sum::FinalizeAtomic<FloatType>(const AggState<FloatType>::AtomicT &state,
+                                           AggState<FloatType>::ResultT *result) {
+  *result = state.load(std::memory_order_relaxed);
+}
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for DoubleType
+template <>
+struct Sum::AggState<DoubleType> {
+  typedef double T;
+  typedef std::atomic<double> AtomicT;
+  typedef double ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<DoubleType>(const DoubleType::cpptype &value,
+                                            AggState<DoubleType>::AtomicT *state) {
+  AggState<DoubleType>::T state_val = state->load(std::memory_order_relaxed);
+  while (!state->compare_exchange_weak(state_val, state_val + value)) {}
+}
+
+template <>
+inline void Sum::FinalizeAtomic<DoubleType>(const AggState<DoubleType>::AtomicT &state,
+                                            AggState<DoubleType>::ResultT *result) {
+  *result = state.load(std::memory_order_relaxed);
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 4220a8d..c0ebad7 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(expressions_aggregation_AggregateFunction_proto_srcs
                          AggregateFunction.proto)
 
 # Declare micro-libs:
+add_library(quickstep_expressions_aggregation_AggFunc ../../empty_src.cpp AggFunc.hpp)
 add_library(quickstep_expressions_aggregation_AggregateFunction
             AggregateFunction.cpp
             AggregateFunction.hpp)
@@ -69,6 +70,14 @@ add_library(quickstep_expressions_aggregation_AggregationID
             AggregationID.hpp)
 
 # Link dependencies:
+target_link_libraries(quickstep_expressions_aggregation_AggFunc
+                      glog
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_types_DoubleType
+                      quickstep_types_FloatType
+                      quickstep_types_IntType
+                      quickstep_types_LongType
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregateFunction
                       glog
                       quickstep_expressions_aggregation_AggregateFunction_proto
@@ -236,6 +245,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
 # Submodule all-in-one library:
 add_library(quickstep_expressions_aggregation ../../empty_src.cpp)
 target_link_libraries(quickstep_expressions_aggregation
+                      quickstep_expressions_aggregation_AggFunc
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunction_proto
                       quickstep_expressions_aggregation_AggregateFunctionAvg

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 70b69e0..19fc322 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -163,6 +163,8 @@ static const volatile bool aggregate_hashtable_type_dummy
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
+DEFINE_bool(use_collision_free_agg, true, "");
+
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
@@ -1508,7 +1510,8 @@ void ExecutionGenerator::convertAggregate(
         cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
 
     std::size_t max_num_groups;
-    if (cost_model_for_aggregation_
+    if (FLAGS_use_collision_free_agg &&
+        cost_model_for_aggregation_
             ->canUseCollisionFreeAggregation(physical_plan,
                                              estimated_num_groups,
                                              &max_num_groups)) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f39b41..00bb433 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -67,7 +67,7 @@ DEFINE_int32(num_aggregation_partitions,
              41,
              "The number of partitions used for performing the aggregation");
 DEFINE_int32(partition_aggregation_num_groups_threshold,
-             500000,
+             100,
              "The threshold used for deciding whether the aggregation is done "
              "in a partitioned way or not");
 
@@ -208,13 +208,13 @@ AggregationOperationState::AggregationOperationState(
               group_by_handles,
               storage_manager));
     } else if (is_aggregate_partitioned_) {
-      partitioned_group_by_hashtable_pool_.reset(
-          new PartitionedHashTablePool(estimated_num_entries,
-                                       FLAGS_num_aggregation_partitions,
-                                       hash_table_impl_type,
-                                       group_by_types_,
-                                       group_by_handles,
-                                       storage_manager));
+      partitioned_hashtable_.reset(
+          AggregationStateHashTableFactory::CreateResizable(
+              hash_table_impl_type,
+              group_by_types_,
+              estimated_num_entries,
+              group_by_handles,
+              storage_manager_));
     } else {
       group_by_hashtable_pool_.reset(
           new HashTablePool(estimated_num_entries,
@@ -406,7 +406,8 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
     return static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->getNumFinalizationPartitions();
   } else if (is_aggregate_partitioned_) {
-    return partitioned_group_by_hashtable_pool_->getNumPartitions();
+    return static_cast<PackedPayloadHashTable *>(
+        partitioned_hashtable_.get())->getNumFinalizationPartitions();
   } else  {
     return 1u;
   }
@@ -549,62 +550,11 @@ void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
 
 void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
     const ValueAccessorMultiplexer &accessor_mux) {
-  DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
-
-  std::vector<attribute_id> group_by_key_ids;
-  for (const MultiSourceAttributeId &key_id : group_by_key_ids_) {
-    DCHECK(key_id.source == ValueAccessorSource::kBase);
-    group_by_key_ids.emplace_back(key_id.attr_id);
-  }
+  DCHECK(partitioned_hashtable_ != nullptr);
 
-  InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-      accessor_mux.getBaseAccessor(),
-      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-    // TODO(jianqiao): handle the situation when keys in non_trivial_results
-    const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
-
-    // Compute the partitions for the tuple formed by group by values.
-    std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
-    partition_membership.resize(num_partitions);
-
-    // Create a tuple-id sequence for each partition.
-    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-      partition_membership[partition].reset(
-          new TupleIdSequence(accessor->getEndPosition()));
-    }
-
-    // Iterate over ValueAccessor for each tuple,
-    // set a bit in the appropriate TupleIdSequence.
-    while (accessor->next()) {
-      // We need a unique_ptr because getTupleWithAttributes() uses "new".
-      std::unique_ptr<Tuple> curr_tuple(
-          accessor->getTupleWithAttributes(group_by_key_ids));
-      const std::size_t curr_tuple_partition_id =
-          curr_tuple->getTupleHash() % num_partitions;
-      partition_membership[curr_tuple_partition_id]->set(
-          accessor->getCurrentPosition(), true);
-    }
-
-    // Aggregate each partition.
-    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-      std::unique_ptr<ValueAccessor> base_adapter(
-          accessor->createSharedTupleIdSequenceAdapter(
-              *partition_membership[partition]));
-
-      std::unique_ptr<ValueAccessor> derived_adapter;
-      if (accessor_mux.getDerivedAccessor() != nullptr) {
-        derived_adapter.reset(
-            accessor_mux.getDerivedAccessor()->createSharedTupleIdSequenceAdapterVirtual(
-                *partition_membership[partition]));
-      }
-
-      ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get());
-      partitioned_group_by_hashtable_pool_->getHashTable(partition)
-          ->upsertValueAccessorCompositeKey(argument_ids_,
-                                            group_by_key_ids_,
-                                            local_mux);
-    }
-  });
+  partitioned_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
+                                                           group_by_key_ids_,
+                                                           accessor_mux);
 }
 
 void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
@@ -712,20 +662,18 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
   PackedPayloadHashTable *hash_table =
-      static_cast<PackedPayloadHashTable *>(
-          partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
+      static_cast<PackedPayloadHashTable *>(partitioned_hashtable_.get());
 
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
 
   if (handles_.empty()) {
-    const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
-                                                 const std::uint8_t *dumb_placeholder) -> void {
+    hash_table->forEachCompositeKeyInPartition(
+        partition_id,
+        [&](std::vector<TypedValue> &group_by_key) -> void {
       group_by_keys.emplace_back(std::move(group_by_key));
-    };
-
-    hash_table->forEachCompositeKey(&keys_retriever);
+    });
   }
 
   // Collect per-aggregate finalized values.
@@ -737,15 +685,8 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
       final_values.emplace_back(agg_result_col);
     }
   }
-  hash_table->destroyPayload();
+//  hash_table->destroyPayload();
 
-  // Reorganize 'group_by_keys' in column-major order so that we can make a
-  // ColumnVectorsValueAccessor to bulk-insert results.
-  //
-  // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
-  // if there is only one aggregate. The need to do this should hopefully go
-  // away when we work out storing composite structures for multiple aggregates
-  // in a single HashTable.
   std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
   std::size_t group_by_element_idx = 0;
   for (const Type *group_by_type : group_by_types_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c8930ee..a75f243 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -296,7 +296,7 @@ class AggregationOperationState {
   // A vector of group by hash table pools.
   std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
 
-  std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+  std::unique_ptr<AggregationStateHashTableBase> partitioned_hashtable_;
 
   std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 293be17..fcc069b 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -436,7 +436,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
 target_link_libraries(quickstep_storage_CollisionFreeVectorTable
+                      ${GFLAGS_LIB_NAME}
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggFunc
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
@@ -447,11 +449,12 @@ target_link_libraries(quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
+                      quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeID
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
-                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_BoolVector
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ColumnStoreUtil
                       quickstep_catalog_CatalogAttribute

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index d836014..c92f0ab 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -24,6 +24,7 @@
 #include <cstdint>
 #include <cstdlib>
 #include <memory>
+#include <type_traits>
 #include <vector>
 
 #include "expressions/aggregation/AggregationHandle.hpp"
@@ -33,13 +34,175 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorMultiplexer.hpp"
 #include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "types/TypeID.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+#include "utility/BoolVector.hpp"
 
 #include "glog/logging.h"
 
 namespace quickstep {
 
+DEFINE_uint64(vt_threadprivate_threshold, 1000000L, "");
+DEFINE_bool(use_latch, false, "");
+
+namespace {
+
+template <typename T>
+using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>;
+
+template <typename FunctorT>
+inline auto InvokeOnKeyType(const Type &type,
+                            const FunctorT &functor) {
+  switch (type.getTypeID()) {
+    case TypeID::kInt:
+      return functor(static_cast<const IntType&>(type));
+    case TypeID::kLong:
+      return functor(static_cast<const LongType&>(type));
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnType(const Type &type,
+                         const FunctorT &functor) {
+  switch (type.getTypeID()) {
+    case TypeID::kInt:
+      return functor(static_cast<const IntType&>(type));
+    case TypeID::kLong:
+      return functor(static_cast<const LongType&>(type));
+    case TypeID::kFloat:
+      return functor(static_cast<const FloatType&>(type));
+    case TypeID::kDouble:
+      return functor(static_cast<const DoubleType&>(type));
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBool(const bool &val,
+                         const FunctorT &functor) {
+  if (val) {
+    return functor(std::true_type());
+  } else {
+    return functor(std::false_type());
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBools(const bool &val1,
+                          const bool &val2,
+                          const FunctorT &functor) {
+  if (val1) {
+    if (val2) {
+      return functor(std::true_type(), std::true_type());
+    } else {
+      return functor(std::true_type(), std::false_type());
+    }
+  } else {
+    if (val2) {
+      return functor(std::false_type(), std::true_type());
+    } else {
+      return functor(std::false_type(), std::false_type());
+    }
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnAggFunc(const AggregationID &agg_id,
+                            const FunctorT &functor) {
+  switch (agg_id) {
+    case AggregationID::kSum: {
+      return functor(Sum());
+    }
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeIf(const std::true_type &val,
+                     const FunctorT &functor) {
+  return functor();
+}
+
+template <typename FunctorT>
+inline void InvokeIf(const std::false_type &val,
+                     const FunctorT &functor) {
+}
+
+//template <typename FunctorT>
+//inline void InvokeOnAggFuncIfApplicableToArgType(
+//    const AggregationID &agg_id,
+//    const Type &arg_type,
+//    const FunctorT &functor) {
+//  InvokeOnAggFunc(
+//      agg_id,
+//      [&](const auto &agg_func) -> void {
+//    InvokeOnType(
+//        arg_type,
+//        [&](const auto &arg_type) -> void {
+//      using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+//      using ArgT = remove_const_reference_t<decltype(arg_type)>;
+//
+//      InvokeIf(
+//          typename AggFuncT::template HasAtomicImpl<ArgT>(),
+//          [&]() -> void {
+//        functor(agg_func, arg_type);
+//      });
+//    });
+//  });
+//}
+
+template <typename FunctorT>
+inline void InvokeOnAggFuncWithArgType(
+    const AggregationID &agg_id,
+    const Type &arg_type,
+    const FunctorT &functor) {
+  InvokeOnAggFunc(
+      agg_id,
+      [&](const auto &agg_func) -> void {
+    InvokeOnType(
+        arg_type,
+        [&](const auto &arg_type) -> void {
+      functor(agg_func, arg_type);
+    });
+  });
+}
+
+template <typename FunctorT>
+inline auto InvokeOnTwoAccessors(
+    const ValueAccessorMultiplexer &accessor_mux,
+    const ValueAccessorSource &first_source,
+    const ValueAccessorSource &second_source,
+    const FunctorT &functor) {
+  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accessor =
+      static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+
+  InvokeOnAnyValueAccessor(
+      base_accessor,
+      [&](auto *accessor) {
+    if (first_source == ValueAccessorSource::kBase) {
+      if (second_source == ValueAccessorSource::kBase) {
+        return functor(std::false_type(), accessor, accessor);
+      } else {
+        return functor(std::true_type(), accessor, derived_accessor);
+      }
+    } else {
+      if (second_source == ValueAccessorSource::kBase) {
+        return functor(std::true_type(), derived_accessor, accessor);
+      } else {
+        return functor(std::false_type(), derived_accessor, derived_accessor);
+      }
+    }
+  });
+}
+
+}  // namespace
+
 CollisionFreeVectorTable::CollisionFreeVectorTable(
     const Type *key_type,
     const std::size_t num_entries,
@@ -49,46 +212,45 @@ CollisionFreeVectorTable::CollisionFreeVectorTable(
       num_entries_(num_entries),
       num_handles_(handles.size()),
       handles_(handles),
+      use_thread_private_existence_map_(num_entries_ < FLAGS_vt_threadprivate_threshold),
       num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
       storage_manager_(storage_manager) {
   DCHECK_GT(num_entries, 0u);
 
   std::size_t required_memory = 0;
   const std::size_t existence_map_offset = 0;
+  std::size_t mutex_vec_offset = 0;
   std::vector<std::size_t> state_offsets;
 
-  required_memory += CacheLineAlignedBytes(
-      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+  if (!use_thread_private_existence_map_) {
+    required_memory += CacheLineAlignedBytes(
+        BarrieredReadWriteConcurrentBoolVector::BytesNeeded(num_entries));
+  }
+
+  if (FLAGS_use_latch) {
+    mutex_vec_offset = required_memory;
+    required_memory += CacheLineAlignedBytes(num_entries * sizeof(SpinMutex));
+  }
 
   for (std::size_t i = 0; i < num_handles_; ++i) {
     const AggregationHandle *handle = handles_[i];
     const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+    DCHECK_EQ(1u, argument_types.size());
 
     std::size_t state_size = 0;
-    switch (handle->getAggregationID()) {
-      case AggregationID::kCount: {
-        state_size = sizeof(std::atomic<std::size_t>);
-        break;
-      }
-      case AggregationID::kSum: {
-        DCHECK_EQ(1u, argument_types.size());
-        switch (argument_types.front()->getTypeID()) {
-          case TypeID::kInt:  // Fall through
-          case TypeID::kLong:
-            state_size = sizeof(std::atomic<std::int64_t>);
-            break;
-          case TypeID::kFloat:  // Fall through
-          case TypeID::kDouble:
-            state_size = sizeof(std::atomic<double>);
-            break;
-          default:
-            LOG(FATAL) << "Not implemented";
-        }
-        break;
+    InvokeOnAggFuncWithArgType(
+        handle->getAggregationID(),
+        *argument_types.front(),
+        [&](const auto &agg_func, const auto &arg_type) {
+      using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+      using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+      if (FLAGS_use_latch) {
+        state_size = sizeof(typename AggFuncT::template AggState<ArgT>::T);
+      } else {
+        state_size = sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT);
       }
-      default:
-        LOG(FATAL) << "Not implemented";
-    }
+    });
 
     state_offsets.emplace_back(required_memory);
     required_memory += CacheLineAlignedBytes(state_size * num_entries);
@@ -101,10 +263,21 @@ CollisionFreeVectorTable::CollisionFreeVectorTable(
   blob_ = storage_manager_->getBlobMutable(blob_id);
 
   void *memory_start = blob_->getMemoryMutable();
-  existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
-      reinterpret_cast<char *>(memory_start) + existence_map_offset,
-      num_entries,
-      false /* initialize */));
+  if (use_thread_private_existence_map_) {
+    thread_private_existence_map_pool_.reset(new BoolVectorPool(num_entries));
+  } else {
+    concurrent_existence_map_.reset(new BarrieredReadWriteConcurrentBoolVector(
+        reinterpret_cast<char *>(memory_start) + existence_map_offset,
+        num_entries,
+        false /* initialize */));
+  }
+
+  if (FLAGS_use_latch) {
+    mutex_vec_ = reinterpret_cast<SpinMutex *>(
+        reinterpret_cast<char *>(memory_start) + mutex_vec_offset);
+  } else {
+    mutex_vec_ = nullptr;
+  }
 
   for (std::size_t i = 0; i < num_handles_; ++i) {
     // Columnwise layout.
@@ -132,113 +305,103 @@ bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey(
   DCHECK_EQ(1u, key_ids.size());
 
   if (handles_.empty()) {
-    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-        accessor_mux.getValueAccessorBySource(key_ids.front().source),
-        [&key_ids, this](auto *accessor) -> void {  // NOLINT(build/c++11)
-      this->upsertValueAccessorKeyOnlyHelper(key_type_->isNullable(),
-                                             key_type_,
-                                             key_ids.front().attr_id,
-                                             accessor);
-    });
-    return true;
+    LOG(FATAL) << "Not implemented";
   }
 
-  DCHECK(accessor_mux.getDerivedAccessor() == nullptr ||
-         accessor_mux.getDerivedAccessor()->getImplementationType()
-             == ValueAccessor::Implementation::kColumnVectors);
+  const ValueAccessorSource key_source = key_ids.front().source;
+  const attribute_id key_id = key_ids.front().attr_id;
+  const bool is_key_nullable = key_type_->isNullable();
 
-  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
-  ColumnVectorsValueAccessor *derived_accesor =
-      static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    DCHECK_LE(argument_ids[i].size(), 1u);
 
-  // Dispatch to specialized implementations to achieve maximum performance.
-  InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-      base_accessor,
-      [&argument_ids, &key_ids, &derived_accesor, this](auto *accessor) -> void {  // NOLINT(build/c++11)
-    const ValueAccessorSource key_source = key_ids.front().source;
-    const attribute_id key_id = key_ids.front().attr_id;
-    const bool is_key_nullable = key_type_->isNullable();
-
-    for (std::size_t i = 0; i < num_handles_; ++i) {
-      DCHECK_LE(argument_ids[i].size(), 1u);
-
-      const AggregationHandle *handle = handles_[i];
-      const auto &argument_types = handle->getArgumentTypes();
-      const auto &argument_ids_i = argument_ids[i];
-
-      ValueAccessorSource argument_source;
-      attribute_id argument_id;
-      const Type *argument_type;
-      bool is_argument_nullable;
-
-      if (argument_ids_i.empty()) {
-        argument_source = ValueAccessorSource::kInvalid;
-        argument_id = kInvalidAttributeID;
-
-        DCHECK(argument_types.empty());
-        argument_type = nullptr;
-        is_argument_nullable = false;
-      } else {
-        DCHECK_EQ(1u, argument_ids_i.size());
-        argument_source = argument_ids_i.front().source;
-        argument_id = argument_ids_i.front().attr_id;
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+    const auto &argument_ids_i = argument_ids[i];
+
+    ValueAccessorSource argument_source;
+    attribute_id argument_id;
+    const Type *argument_type;
+    bool is_argument_nullable;
+
+    if (argument_ids_i.empty()) {
+//      argument_source = ValueAccessorSource::kInvalid;
+//      argument_id = kInvalidAttributeID;
+//
+//      DCHECK(argument_types.empty());
+//      argument_type = nullptr;
+//      is_argument_nullable = false;
+      LOG(FATAL) << "Not supported";
+    } else {
+      DCHECK_EQ(1u, argument_ids_i.size());
+      argument_source = argument_ids_i.front().source;
+      argument_id = argument_ids_i.front().attr_id;
+
+      DCHECK_EQ(1u, argument_types.size());
+      argument_type = argument_types.front();
+      is_argument_nullable = argument_type->isNullable();
+    }
 
-        DCHECK_EQ(1u, argument_types.size());
-        argument_type = argument_types.front();
-        is_argument_nullable = argument_type->isNullable();
-      }
+    InvokeOnAggFuncWithArgType(
+        handle->getAggregationID(),
+        *argument_types.front(),
+        [&](const auto &agg_func, const auto &arg_type) {
+      using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+      using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+      InvokeOnKeyType(
+          *key_type_,
+          [&](const auto &key_type) -> void {
+        using KeyT = remove_const_reference_t<decltype(key_type)>;
+
+        InvokeOnBools(
+            is_key_nullable,
+            is_argument_nullable,
+            [&](const auto &is_key_nullable,
+                const auto &is_argument_nullable) -> void {
+          using KeyNullableT =
+              remove_const_reference_t<decltype(is_key_nullable)>;
+          using ArgNullableT =
+              remove_const_reference_t<decltype(is_argument_nullable)>;
+
+          InvokeOnTwoAccessors(
+              accessor_mux,
+              key_source,
+              argument_source,
+              [&](const auto &use_two_accessors,
+                  auto *key_accessor,
+                  auto *argument_accessor) {
+            using UseTwoAccessorsT =
+                remove_const_reference_t<decltype(use_two_accessors)>;
+
+            invokeOnExistenceMap(
+                [&](auto *existence_map) -> void {
+              if (FLAGS_use_latch) {
+                upsertValueAccessorInternalUnaryLatch<
+                    AggFuncT, KeyT, ArgT,
+                    KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>(                                                                 key_id,
+                        argument_id,
+                        vec_tables_[i],
+                        existence_map,
+                        key_accessor,
+                        argument_accessor);
+              } else {
+                upsertValueAccessorInternalUnaryAtomic<
+                    AggFuncT, KeyT, ArgT,
+                    KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>(                                                                 key_id,
+                        argument_id,
+                        vec_tables_[i],
+                        existence_map,
+                        key_accessor,
+                        argument_accessor);
+              }
+            });
+          });
+        });
+      });
+    });
+  }
 
-      if (key_source == ValueAccessorSource::kBase) {
-        if (argument_source == ValueAccessorSource::kBase) {
-          this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
-                                                         is_argument_nullable,
-                                                         key_type_,
-                                                         argument_type,
-                                                         handle->getAggregationID(),
-                                                         key_id,
-                                                         argument_id,
-                                                         vec_tables_[i],
-                                                         accessor,
-                                                         accessor);
-        } else {
-          this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
-                                                        is_argument_nullable,
-                                                        key_type_,
-                                                        argument_type,
-                                                        handle->getAggregationID(),
-                                                        key_id,
-                                                        argument_id,
-                                                        vec_tables_[i],
-                                                        accessor,
-                                                        derived_accesor);
-        }
-      } else {
-        if (argument_source == ValueAccessorSource::kBase) {
-          this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
-                                                        is_argument_nullable,
-                                                        key_type_,
-                                                        argument_type,
-                                                        handle->getAggregationID(),
-                                                        key_id,
-                                                        argument_id,
-                                                        vec_tables_[i],
-                                                        derived_accesor,
-                                                        accessor);
-        } else {
-          this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
-                                                         is_argument_nullable,
-                                                         key_type_,
-                                                         argument_type,
-                                                         handle->getAggregationID(),
-                                                         key_id,
-                                                         argument_id,
-                                                         vec_tables_[i],
-                                                         derived_accesor,
-                                                         derived_accesor);
-        }
-      }
-    }
-  });
   return true;
 }
 
@@ -249,16 +412,17 @@ void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
   const std::size_t end_position =
       calculatePartitionEndPosition(partition_id);
 
-  switch (key_type_->getTypeID()) {
-    case TypeID::kInt:
-      finalizeKeyInternal<int>(start_position, end_position, output_cv);
-      return;
-    case TypeID::kLong:
-      finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
+  InvokeOnKeyType(
+      *key_type_,
+      [&](const auto &key_type) {
+    using KeyT = remove_const_reference_t<decltype(key_type)>;
+
+    invokeOnExistenceMapFinal(
+        [&](const auto *existence_map) -> void {
+      finalizeKeyInternal<typename KeyT::cpptype>(
+          start_position, end_position, existence_map, output_cv);
+    });
+  });
 }
 
 void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
@@ -274,12 +438,32 @@ void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
   const Type *argument_type =
       argument_types.empty() ? nullptr : argument_types.front();
 
-  finalizeStateDispatchHelper(handle->getAggregationID(),
-                              argument_type,
-                              vec_tables_[handle_id],
-                              start_position,
-                              end_position,
-                              output_cv);
+  DCHECK(argument_type != nullptr);
+
+  InvokeOnAggFuncWithArgType(
+      handle->getAggregationID(),
+      *argument_type,
+      [&](const auto &agg_func, const auto &arg_type) {
+    using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+    using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+    invokeOnExistenceMapFinal(
+        [&](const auto *existence_map) -> void {
+      if (FLAGS_use_latch) {
+        finalizeStateInternalLatch<AggFuncT, ArgT>(start_position,
+                                                   end_position,
+                                                   vec_tables_[handle_id],
+                                                   existence_map,
+                                                   output_cv);
+      } else {
+        finalizeStateInternalAtomic<AggFuncT, ArgT>(start_position,
+                                                    end_position,
+                                                    vec_tables_[handle_id],
+                                                    existence_map,
+                                                    output_cv);
+      }
+    });
+  });
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 772d47d..79020fb 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -29,22 +29,27 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/StorageBlob.hpp"
 #include "storage/StorageConstants.hpp"
 #include "storage/ValueAccessorMultiplexer.hpp"
+#include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeID.hpp"
 #include "types/containers/ColumnVector.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+#include "utility/BoolVector.hpp"
 #include "utility/Macros.hpp"
 
+#include "gflags/gflags.h"
+
 #include "glog/logging.h"
 
 namespace quickstep {
 
 class AggregationHandle;
+class BarrieredReadWriteConcurrentBitVector;
 class StorageManager;
 
 /** \addtogroup Storage
@@ -101,7 +106,17 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
         calculatePartitionStartPosition(partition_id);
     const std::size_t end_position =
         calculatePartitionEndPosition(partition_id);
-    return existence_map_->onesCountInRange(start_position, end_position);
+
+    if (use_thread_private_existence_map_) {
+      auto &bool_vectors = thread_private_existence_map_pool_->getAll();
+      auto &target_bv = bool_vectors.front();
+      for (std::size_t i = 1; i < bool_vectors.size(); ++i) {
+        target_bv->unionWith(*bool_vectors[i], start_position, end_position);
+      }
+      return target_bv->onesCountInRange(start_position, end_position);
+    } else {
+      return concurrent_existence_map_->onesCountInRange(start_position, end_position);
+    }
   }
 
   /**
@@ -110,7 +125,7 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
    * @return The existence map for this vector table.
    */
   inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
-    return existence_map_.get();
+    return nullptr;
   }
 
   /**
@@ -214,115 +229,67 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
                     num_entries_);
   }
 
-  template <bool use_two_accessors, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const bool is_key_nullable,
-      const bool is_argument_nullable,
-      ArgTypes &&...args);
+  template <typename FunctorT>
+  inline void invokeOnExistenceMap(const FunctorT &functor) {
+    if (use_thread_private_existence_map_) {
+      BoolVector *existence_map = thread_private_existence_map_pool_->checkOut();
+      functor(existence_map);
+      thread_private_existence_map_pool_->checkIn(existence_map);
+    } else {
+      functor(concurrent_existence_map_.get());
+    }
+  }
 
-  template <bool ...bool_values, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const Type *key_type,
-      ArgTypes &&...args);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const Type *argument_type,
-      const AggregationID agg_id,
-      ArgTypes &&...args);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorCountHelper(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      void *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorSumHelper(
-      const Type *argument_type,
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      void *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <typename ...ArgTypes>
-  inline void upsertValueAccessorKeyOnlyHelper(
-      const bool is_key_nullable,
-      const Type *key_type,
-      ArgTypes &&...args);
-
-  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
-  inline void upsertValueAccessorKeyOnly(
-      const attribute_id key_attr_id,
-      KeyValueAccessorT *key_accessor);
-
-  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
-  inline void upsertValueAccessorCountNullary(
-      const attribute_id key_attr_id,
-      std::atomic<std::size_t> *vec_table,
-      KeyValueAccessorT *key_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorCountUnary(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<std::size_t> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename ArgumentT, typename StateT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorIntegerSum(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<StateT> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-            typename KeyT, typename ArgumentT, typename StateT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorGenericSum(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<StateT> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <typename KeyT>
+  template <typename FunctorT>
+  inline void invokeOnExistenceMapFinal(const FunctorT &functor) const {
+    if (use_thread_private_existence_map_) {
+      const BoolVector *existence_map =
+          thread_private_existence_map_pool_->getAll().front().get();
+      functor(existence_map);
+    } else {
+      functor(concurrent_existence_map_.get());
+    }
+  }
+
+  template <typename AggFuncT, typename KeyT, typename ArgT,
+            bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+            typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
+  inline void upsertValueAccessorInternalUnaryAtomic(const attribute_id key_attr_id,
+                                                     const attribute_id argument_id,
+                                                     void *vec_table,
+                                                     BoolVectorT *existence_map,
+                                                     KeyAccessorT *key_accessor,
+                                                     ArgAccessorT *argument_accessor);
+
+  template <typename AggFuncT, typename KeyT, typename ArgT,
+            bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+            typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
+  inline void upsertValueAccessorInternalUnaryLatch(const attribute_id key_attr_id,
+                                                    const attribute_id argument_id,
+                                                    void *vec_table,
+                                                    BoolVectorT *existence_map,
+                                                    KeyAccessorT *key_accessor,
+                                                    ArgAccessorT *argument_accessor);
+
+  template <typename KeyT, typename BoolVectorT>
   inline void finalizeKeyInternal(const std::size_t start_position,
                                   const std::size_t end_position,
+                                  BoolVectorT *existence_map,
                                   NativeColumnVector *output_cv) const;
 
-  template <typename ...ArgTypes>
-  inline void finalizeStateDispatchHelper(const AggregationID agg_id,
-                                          const Type *argument_type,
+  template <typename AggFuncT, typename ArgT, typename BoolVectorT>
+  inline void finalizeStateInternalAtomic(const std::size_t start_position,
+                                          const std::size_t end_position,
                                           const void *vec_table,
-                                          ArgTypes &&...args) const;
-
-  template <typename ...ArgTypes>
-  inline void finalizeStateSumHelper(const Type *argument_type,
-                                     const void *vec_table,
-                                     ArgTypes &&...args) const;
-
-  inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
-                                 const std::size_t start_position,
-                                 const std::size_t end_position,
-                                 NativeColumnVector *output_cv) const;
+                                          BoolVectorT *existence_map,
+                                          NativeColumnVector *output_cv) const;
 
-  template <typename ResultT, typename StateT>
-  inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
-                               const std::size_t start_position,
-                               const std::size_t end_position,
-                               NativeColumnVector *output_cv) const;
+  template <typename AggFuncT, typename ArgT, typename BoolVectorT>
+  inline void finalizeStateInternalLatch(const std::size_t start_position,
+                                         const std::size_t end_position,
+                                         const void *vec_table,
+                                         BoolVectorT *existence_map,
+                                         NativeColumnVector *output_cv) const;
 
   const Type *key_type_;
   const std::size_t num_entries_;
@@ -330,8 +297,12 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
   const std::size_t num_handles_;
   const std::vector<AggregationHandle *> handles_;
 
-  std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
+  const bool use_thread_private_existence_map_;
+  std::unique_ptr<BarrieredReadWriteConcurrentBoolVector> concurrent_existence_map_;
+  std::unique_ptr<BoolVectorPool> thread_private_existence_map_pool_;
+
   std::vector<void *> vec_tables_;
+  SpinMutex *mutex_vec_;
 
   const std::size_t num_finalize_partitions_;
 
@@ -347,392 +318,144 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
 // ----------------------------------------------------------------------------
 // Implementations of template methods follow.
 
-template <bool use_two_accessors, typename ...ArgTypes>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorDispatchHelper(const bool is_key_nullable,
-                                        const bool is_argument_nullable,
-                                        ArgTypes &&...args) {
-  if (is_key_nullable) {
-    if (is_argument_nullable) {
-      upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
-          std::forward<ArgTypes>(args)...);
-    } else {
-      upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
-          std::forward<ArgTypes>(args)...);
-    }
-  } else {
-    if (is_argument_nullable) {
-      upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
-          std::forward<ArgTypes>(args)...);
-    } else {
-      upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
-          std::forward<ArgTypes>(args)...);
-    }
-  }
-}
-
-template <bool ...bool_values, typename ...ArgTypes>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorDispatchHelper(const Type *key_type,
-                                        ArgTypes &&...args) {
-  switch (key_type->getTypeID()) {
-    case TypeID::kInt:
-      upsertValueAccessorDispatchHelper<bool_values..., int>(
-          std::forward<ArgTypes>(args)...);
-      return;
-    case TypeID::kLong:
-      upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
-          std::forward<ArgTypes>(args)...);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename ...ArgTypes>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorDispatchHelper(const Type *argument_type,
-                                        const AggregationID agg_id,
-                                        ArgTypes &&...args) {
-  switch (agg_id) {
-     case AggregationID::kCount:
-       upsertValueAccessorCountHelper<
-           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
-               std::forward<ArgTypes>(args)...);
-       return;
-     case AggregationID::kSum:
-       upsertValueAccessorSumHelper<
-           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
-               argument_type, std::forward<ArgTypes>(args)...);
-       return;
-     default:
-       LOG(FATAL) << "Not supported";
-  }
-}
-
-template <typename ...ArgTypes>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable,
-                                       const Type *key_type,
-                                       ArgTypes &&...args) {
-  switch (key_type->getTypeID()) {
-    case TypeID::kInt: {
-      if (is_key_nullable) {
-        upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...);
-      } else {
-        upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...);
-      }
-      return;
-    }
-    case TypeID::kLong: {
-      if (is_key_nullable) {
-        upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...);
-      } else {
-        upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...);
-      }
-      return;
-    }
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id,
-                                 ValueAccessorT *accessor) {
-  accessor->beginIteration();
-  while (accessor->next()) {
-    const KeyT *key = static_cast<const KeyT *>(
-        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    existence_map_->setBit(*key);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorCountHelper(const attribute_id key_attr_id,
-                                     const attribute_id argument_id,
-                                     void *vec_table,
-                                     KeyValueAccessorT *key_accessor,
-                                     ArgumentValueAccessorT *argument_accessor) {
-  DCHECK_GE(key_attr_id, 0);
-
-  if (is_argument_nullable && argument_id != kInvalidAttributeID) {
-    upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
-        key_attr_id,
-        argument_id,
-        static_cast<std::atomic<std::size_t> *>(vec_table),
-        key_accessor,
-        argument_accessor);
-    return;
-  } else {
-    upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
-        key_attr_id,
-        static_cast<std::atomic<std::size_t> *>(vec_table),
-        key_accessor);
-    return;
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+template <typename AggFuncT, typename KeyT, typename ArgT,
+          bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+          typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
 inline void CollisionFreeVectorTable
-    ::upsertValueAccessorSumHelper(const Type *argument_type,
-                                   const attribute_id key_attr_id,
-                                   const attribute_id argument_id,
-                                   void *vec_table,
-                                   KeyValueAccessorT *key_accessor,
-                                   ArgumentValueAccessorT *argument_accessor) {
-  DCHECK_GE(key_attr_id, 0);
-  DCHECK_GE(argument_id, 0);
-  DCHECK(argument_type != nullptr);
-
-  switch (argument_type->getTypeID()) {
-    case TypeID::kInt:
-      upsertValueAccessorIntegerSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<std::int64_t> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kLong:
-      upsertValueAccessorIntegerSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<std::int64_t> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kFloat:
-      upsertValueAccessorGenericSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<double> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kDouble:
-      upsertValueAccessorGenericSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<double> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
+    ::upsertValueAccessorInternalUnaryAtomic(const attribute_id key_attr_id,
+                                             const attribute_id argument_id,
+                                             void *vec_table,
+                                             BoolVectorT *existence_map,
+                                             KeyAccessorT *key_accessor,
+                                             ArgAccessorT *argument_accessor) {
+  auto *states = static_cast<
+      typename AggFuncT::template AggState<ArgT>::AtomicT *>(vec_table);
 
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorCountNullary(const attribute_id key_attr_id,
-                                      std::atomic<std::size_t> *vec_table,
-                                      ValueAccessorT *accessor) {
-  accessor->beginIteration();
-  while (accessor->next()) {
-    const KeyT *key = static_cast<const KeyT *>(
-        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
-    existence_map_->setBit(loc);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorCountUnary(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<std::size_t> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) {
   key_accessor->beginIteration();
   if (use_two_accessors) {
     argument_accessor->beginIteration();
   }
-  while (key_accessor->next()) {
-    if (use_two_accessors) {
-      argument_accessor->next();
-    }
-    const KeyT *key = static_cast<const KeyT *>(
-        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
-      continue;
-    }
-    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
-  }
-}
 
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename ArgumentT, typename StateT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeVectorTable
-    ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<StateT> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) {
-  key_accessor->beginIteration();
-  if (use_two_accessors) {
-    argument_accessor->beginIteration();
-  }
   while (key_accessor->next()) {
     if (use_two_accessors) {
       argument_accessor->next();
     }
-    const KeyT *key = static_cast<const KeyT *>(
+
+    const auto *key = static_cast<const typename KeyT::cpptype *>(
         key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
     if (is_key_nullable && key == nullptr) {
       continue;
     }
     const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    const ArgumentT *argument = static_cast<const ArgumentT *>(
+    existence_map->set(loc);
+
+    const auto *argument = static_cast<const typename ArgT::cpptype *>(
         argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
     if (is_argument_nullable && argument == nullptr) {
       continue;
     }
-    vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
+
+    AggFuncT::template MergeArgAtomic<ArgT>(*argument, states + loc);
   }
 }
 
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
-          typename KeyT, typename ArgumentT, typename StateT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+template <typename AggFuncT, typename KeyT, typename ArgT,
+          bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+          typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
 inline void CollisionFreeVectorTable
-    ::upsertValueAccessorGenericSum(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<StateT> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) {
+    ::upsertValueAccessorInternalUnaryLatch(const attribute_id key_attr_id,
+                                            const attribute_id argument_id,
+                                            void *vec_table,
+                                            BoolVectorT *existence_map,
+                                            KeyAccessorT *key_accessor,
+                                            ArgAccessorT *argument_accessor) {
+  auto *states = static_cast<
+      typename AggFuncT::template AggState<ArgT>::T *>(vec_table);
+
   key_accessor->beginIteration();
   if (use_two_accessors) {
     argument_accessor->beginIteration();
   }
+
   while (key_accessor->next()) {
     if (use_two_accessors) {
       argument_accessor->next();
     }
-    const KeyT *key = static_cast<const KeyT *>(
+
+    const auto *key = static_cast<const typename KeyT::cpptype *>(
         key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
     if (is_key_nullable && key == nullptr) {
       continue;
     }
     const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    const ArgumentT *argument = static_cast<const ArgumentT *>(
+    existence_map->set(loc);
+
+    const auto *argument = static_cast<const typename ArgT::cpptype *>(
         argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
     if (is_argument_nullable && argument == nullptr) {
       continue;
     }
-    const ArgumentT arg_val = *argument;
-    std::atomic<StateT> &state = vec_table[loc];
-    StateT state_val = state.load(std::memory_order_relaxed);
-    while (!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
+
+    SpinMutexLock lock(mutex_vec_[loc]);
+    AggFuncT::template MergeArgUnsafe<ArgT>(*argument, states + loc);
   }
 }
 
-template <typename KeyT>
+template <typename KeyT, typename BoolVectorT>
 inline void CollisionFreeVectorTable
     ::finalizeKeyInternal(const std::size_t start_position,
                           const std::size_t end_position,
+                          BoolVectorT *existence_map,
                           NativeColumnVector *output_cv) const {
-  std::size_t loc = start_position - 1;
-  while ((loc = existence_map_->nextOne(loc)) < end_position) {
-    *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+  for (std::size_t loc = start_position; loc < end_position; ++loc) {
+    if (existence_map->get(loc)) {
+      *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+    }
   }
 }
 
-template <typename ...ArgTypes>
+template <typename AggFuncT, typename ArgT, typename BoolVectorT>
 inline void CollisionFreeVectorTable
-    ::finalizeStateDispatchHelper(const AggregationID agg_id,
-                                  const Type *argument_type,
+    ::finalizeStateInternalAtomic(const std::size_t start_position,
+                                  const std::size_t end_position,
                                   const void *vec_table,
-                                  ArgTypes &&...args) const {
-  switch (agg_id) {
-     case AggregationID::kCount:
-       finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
-                          std::forward<ArgTypes>(args)...);
-       return;
-     case AggregationID::kSum:
-       finalizeStateSumHelper(argument_type,
-                              vec_table,
-                              std::forward<ArgTypes>(args)...);
-       return;
-     default:
-       LOG(FATAL) << "Not supported";
-  }
-}
-
-template <typename ...ArgTypes>
-inline void CollisionFreeVectorTable
-    ::finalizeStateSumHelper(const Type *argument_type,
-                             const void *vec_table,
-                             ArgTypes &&...args) const {
-  DCHECK(argument_type != nullptr);
-
-  switch (argument_type->getTypeID()) {
-    case TypeID::kInt:    // Fall through
-    case TypeID::kLong:
-      finalizeStateSum<std::int64_t>(
-          static_cast<const std::atomic<std::int64_t> *>(vec_table),
-          std::forward<ArgTypes>(args)...);
-      return;
-    case TypeID::kFloat:  // Fall through
-    case TypeID::kDouble:
-      finalizeStateSum<double>(
-          static_cast<const std::atomic<double> *>(vec_table),
-          std::forward<ArgTypes>(args)...);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
+                                  BoolVectorT *existence_map,
+                                  NativeColumnVector *output_cv) const {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+  using ResultT = typename StateT::ResultT;
+
+  const auto *states = static_cast<const typename StateT::AtomicT *>(vec_table);
+
+  for (std::size_t loc = start_position; loc < end_position; ++loc) {
+    if (existence_map->get(loc)) {
+      AggFuncT::template FinalizeAtomic<ArgT>(
+          states[loc],
+          static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+    }
   }
 }
 
+template <typename AggFuncT, typename ArgT, typename BoolVectorT>
 inline void CollisionFreeVectorTable
-    ::finalizeStateCount(const std::atomic<std::size_t> *vec_table,
-                         const std::size_t start_position,
-                         const std::size_t end_position,
-                         NativeColumnVector *output_cv) const {
-  std::size_t loc = start_position - 1;
-  while ((loc = existence_map_->nextOne(loc)) < end_position) {
-    *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
-        vec_table[loc].load(std::memory_order_relaxed);
+    ::finalizeStateInternalLatch(const std::size_t start_position,
+                                 const std::size_t end_position,
+                                 const void *vec_table,
+                                 BoolVectorT *existence_map,
+                                 NativeColumnVector *output_cv) const {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+  using ResultT = typename StateT::ResultT;
+
+  const auto *states = static_cast<const typename StateT::T *>(vec_table);
+
+  for (std::size_t loc = start_position; loc < end_position; ++loc) {
+    if (existence_map->get(loc)) {
+      AggFuncT::template FinalizeUnsafe<ArgT>(
+          states[loc],
+          static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+    }
   }
 }
 
-template <typename ResultT, typename StateT>
-inline void CollisionFreeVectorTable
-    ::finalizeStateSum(const std::atomic<StateT> *vec_table,
-                       const std::size_t start_position,
-                       const std::size_t end_position,
-                       NativeColumnVector *output_cv) const {
-  std::size_t loc = start_position - 1;
-  while ((loc = existence_map_->nextOne(loc)) < end_position) {
-    *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
-        vec_table[loc].load(std::memory_order_relaxed);
-  }
-}
 
 }  // namespace quickstep
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index bf5eaee..bd7e960 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -256,6 +256,10 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
 void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
                                     const std::size_t extra_variable_storage,
                                     const std::size_t retry_num) {
+  LOG(FATAL) << "Resize " << numEntries() << " + "
+             << extra_buckets << " + " << extra_variable_storage
+             << " -- " << header_->num_buckets;
+
   // A retry should never be necessary with this implementation of HashTable.
   // Separate chaining ensures that any resized hash table with more buckets
   // than the original table will be able to hold more entries than the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index f87a1de..9ba5500 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -95,6 +95,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
 
   void destroyPayload() override;
 
+  inline std::size_t getNumFinalizationPartitions() const {
+    return CalculateNumFinalizationPartitions(numEntries());
+  }
+
   /**
    * @brief Use aggregation handles to update (multiple) aggregation states in
    *        this hash table, with group-by keys and arguments drawn from the
@@ -287,6 +291,11 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
   template <typename FunctorT>
   inline std::size_t forEachCompositeKey(FunctorT *functor) const;
 
+  template <typename FunctorT>
+  inline void forEachCompositeKeyInPartition(
+      const std::size_t partition_id,
+      const FunctorT &functor) const;
+
   /**
    * @brief Apply a functor to each (key, aggregation state) pair in this hash
    *        table, where the aggregation state is retrieved from the value
@@ -328,6 +337,25 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
     return total;
   }
 
+  inline std::size_t calculatePartitionLength() const {
+    const std::size_t num_finalize_partitions = getNumFinalizationPartitions();
+    const std::size_t partition_length =
+        (numEntries() + num_finalize_partitions - 1) / num_finalize_partitions;
+    DCHECK_GE(partition_length, 0u);
+    return partition_length;
+  }
+
+  inline std::size_t calculatePartitionStartPosition(
+      const std::size_t partition_id) const {
+    return calculatePartitionLength() * partition_id;
+  }
+
+  inline std::size_t calculatePartitionEndPosition(
+      const std::size_t partition_id) const {
+    return std::min(calculatePartitionLength() * (partition_id + 1),
+                    numEntries());
+  }
+
   inline bool getNextEntry(TypedValue *key,
                            const std::uint8_t **value,
                            std::size_t *entry_num) const;
@@ -438,6 +466,15 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
            kBucketAlignment;
   }
 
+  inline static std::size_t CalculateNumFinalizationPartitions(
+      const std::size_t num_entries) {
+    // Set finalization segment size as 4096 entries.
+    constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
+
+    // At least 1 partition, at most 80 partitions.
+    return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
+  }
+
   // Attempt to find an empty bucket to insert 'hash_code' into, starting after
   // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
   // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
@@ -975,6 +1012,29 @@ inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
 }
 
 template <typename FunctorT>
+inline void PackedPayloadHashTable::forEachCompositeKeyInPartition(
+    const std::size_t partition_id,
+    const FunctorT &functor) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  std::vector<TypedValue> key;
+  for (std::size_t i = start_position; i < end_position; ++i) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + i * bucket_size_;
+    for (std::vector<const Type *>::size_type key_idx = 0;
+         key_idx < this->key_types_.size();
+         ++key_idx) {
+      key.emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+    }
+    functor(key);
+    key.clear();
+  }
+}
+
+template <typename FunctorT>
 inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
     FunctorT *functor,
     const std::size_t index) const {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/utility/BoolVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BoolVector.hpp b/utility/BoolVector.hpp
new file mode 100644
index 0000000..1f16fc7
--- /dev/null
+++ b/utility/BoolVector.hpp
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_
+#define QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "threading/SpinMutex.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class BoolVector {
+ public:
+  BoolVector(void *memory_location,
+             const std::size_t length,
+             const bool initialize)
+      : owned_(false),
+        length_(length),
+        data_array_(static_cast<bool *>(memory_location)) {
+    DCHECK_GT(length, 0u);
+    DCHECK(data_array_ != nullptr);
+
+    if (initialize) {
+      clear();
+    }
+  }
+
+  explicit BoolVector(const std::size_t length)
+      : owned_(true),
+        length_(length),
+        data_array_(static_cast<bool *>(std::malloc(sizeof(bool) * length))) {
+    DCHECK_GT(length, 0u);
+    clear();
+  }
+
+  ~BoolVector() {
+    if (owned_) {
+      std::free(data_array_);
+    }
+  }
+
+  inline void clear() {
+    std::memset(data_array_, 0, sizeof(bool) * length_);
+  }
+
+  inline void set(const std::size_t loc) {
+    data_array_[loc] = true;
+  }
+
+  inline bool get(const std::size_t loc) const {
+    return data_array_[loc];
+  }
+
+  inline void unionWith(const BoolVector &other,
+                        const std::size_t start_position,
+                        const std::size_t end_position) const {
+    for (std::size_t loc = start_position; loc < end_position; ++loc) {
+      data_array_[loc] |= other.data_array_[loc];
+    }
+  }
+
+  inline std::size_t onesCountInRange(const std::size_t start_position,
+                                      const std::size_t end_position) const {
+    DCHECK_LE(start_position, end_position);
+    DCHECK_LT(start_position, length_);
+    DCHECK_LE(end_position, length_);
+
+    std::size_t count = 0;
+    for (std::size_t i = start_position; i < end_position; ++i) {
+      count += data_array_[i];
+    }
+    return count;
+  }
+
+ private:
+  const bool owned_;
+  const std::size_t length_;
+  bool *data_array_;
+
+  DISALLOW_COPY_AND_ASSIGN(BoolVector);
+};
+
+class BoolVectorPool {
+ public:
+  explicit BoolVectorPool(const std::size_t vector_length)
+      : vector_length_(vector_length) {}
+
+  BoolVector* checkOut() {
+    {
+      SpinMutexLock lock(mutex_);
+      if (!pool_.empty()) {
+        BoolVector *ret = pool_.back().release();
+        pool_.pop_back();
+        return ret;
+      }
+    }
+    return new BoolVector(vector_length_);
+  }
+
+  void checkIn(BoolVector *bool_vector) {
+    SpinMutexLock lock(mutex_);
+    pool_.emplace_back(bool_vector);
+  }
+
+  std::vector<std::unique_ptr<BoolVector>>& getAll() {
+    return pool_;
+  }
+
+  const std::vector<std::unique_ptr<BoolVector>>& getAll() const {
+    return pool_;
+  }
+
+ private:
+  const std::size_t vector_length_;
+
+  SpinMutex mutex_;
+  std::vector<std::unique_ptr<BoolVector>> pool_;
+
+  DISALLOW_COPY_AND_ASSIGN(BoolVectorPool);
+};
+
+class BarrieredReadWriteConcurrentBoolVector {
+ public:
+  BarrieredReadWriteConcurrentBoolVector(void *memory_location,
+                                         const std::size_t length,
+                                         const bool initialize)
+      : owned_(false),
+        length_(length),
+        data_array_(static_cast<DataType *>(memory_location)) {
+    DCHECK_GT(length, 0u);
+    DCHECK(data_array_ != nullptr);
+
+    if (initialize) {
+      clear();
+    }
+  }
+
+  explicit BarrieredReadWriteConcurrentBoolVector(const std::size_t length)
+      : owned_(true),
+        length_(length),
+        data_array_(static_cast<DataType *>(std::malloc(BytesNeeded(length)))) {
+    DCHECK_GT(length, 0u);
+    clear();
+  }
+
+  ~BarrieredReadWriteConcurrentBoolVector() {
+    if (owned_) {
+      std::free(data_array_);
+    }
+  }
+
+  inline static std::size_t BytesNeeded(const std::size_t length) {
+    return kDataSize * length;
+  }
+
+  inline void clear() {
+    std::memset(data_array_, 0, BytesNeeded(length_));
+  }
+
+  inline void set(const std::size_t loc) {
+    data_array_[loc].store(true, std::memory_order_relaxed);
+  }
+
+  inline bool get(const std::size_t loc) const {
+    return data_array_[loc].load(std::memory_order_relaxed);
+  }
+
+  inline std::size_t onesCountInRange(const std::size_t start_position,
+                                      const std::size_t end_position) const {
+    DCHECK_LE(start_position, end_position);
+    DCHECK_LT(start_position, length_);
+    DCHECK_LE(end_position, length_);
+
+    std::size_t count = 0;
+    for (std::size_t i = start_position; i < end_position; ++i) {
+      count += data_array_[i].load(std::memory_order_relaxed);
+    }
+    return count;
+  }
+
+ private:
+  typedef std::atomic<bool> DataType;
+  static constexpr std::size_t kDataSize = sizeof(DataType);
+
+  const bool owned_;
+  const std::size_t length_;
+  DataType *data_array_;
+
+  DISALLOW_COPY_AND_ASSIGN(BarrieredReadWriteConcurrentBoolVector);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ca04462..e9a978e 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -168,6 +168,7 @@ add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp)
 add_library(quickstep_utility_BloomFilter_proto
             ${quickstep_utility_BloomFilter_proto_srcs}
             ${quickstep_utility_BloomFilter_proto_hdrs})
+add_library(quickstep_utility_BoolVector ../empty_src.cpp BoolVector.hpp)
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
@@ -233,6 +234,9 @@ target_link_libraries(quickstep_utility_BloomFilter
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_BloomFilter_proto
                       ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_utility_BoolVector
+                      quickstep_threading_SpinMutex
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_CalculateInstalledMemory
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
@@ -341,6 +345,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_BitVector
                       quickstep_utility_BloomFilter
                       quickstep_utility_BloomFilter_proto
+                      quickstep_utility_BoolVector
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf