You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/24 05:40:34 UTC
[3/3] incubator-quickstep git commit: New aggregation design.
New aggregation design.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bc81c5b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bc81c5b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bc81c5b3
Branch: refs/heads/agg-expr
Commit: bc81c5b3fb8eb4c4bbce67be8247000e959df90e
Parents: 4be8e91
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Feb 22 13:58:08 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Feb 23 22:03:55 2017 -0600
----------------------------------------------------------------------
expressions/aggregation/AggFunc.hpp | 187 +++++++++
expressions/aggregation/CMakeLists.txt | 10 +
query_optimizer/ExecutionGenerator.cpp | 5 +-
storage/AggregationOperationState.cpp | 99 +----
storage/AggregationOperationState.hpp | 2 +-
storage/CMakeLists.txt | 5 +-
storage/CollisionFreeVectorTable.cpp | 478 +++++++++++++++-------
storage/CollisionFreeVectorTable.hpp | 587 ++++++++--------------------
storage/PackedPayloadHashTable.cpp | 4 +
storage/PackedPayloadHashTable.hpp | 60 +++
utility/BoolVector.hpp | 226 +++++++++++
utility/CMakeLists.txt | 5 +
12 files changed, 1007 insertions(+), 661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/expressions/aggregation/AggFunc.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggFunc.hpp b/expressions/aggregation/AggFunc.hpp
new file mode 100644
index 0000000..31f385e
--- /dev/null
+++ b/expressions/aggregation/AggFunc.hpp
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_
+#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <type_traits>
+
+#include "expressions/aggregation/AggregationID.hpp"
+#include "utility/Macros.hpp"
+#include "types/IntType.hpp"
+#include "types/LongType.hpp"
+#include "types/FloatType.hpp"
+#include "types/DoubleType.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVector;
+class StorageManager;
+class Type;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+struct InvalidType {};
+
+template <typename T, typename U>
+struct is_different : std::true_type {};
+
+template <typename T>
+struct is_different<T, T> : std::false_type {};
+
+class Sum {
+ public:
+ Sum() {}
+
+ template <typename ArgType>
+ struct AggState {
+ typedef InvalidType T;
+ typedef InvalidType AtomicT;
+ typedef InvalidType ResultT;
+ };
+
+ template <typename ArgType>
+ struct HasAtomicImpl :
+ is_different<InvalidType,
+ typename AggState<ArgType>::AtomicT> {};
+
+ template <typename ArgType>
+ inline static void MergeArgAtomic(const typename ArgType::cpptype &value,
+ typename AggState<ArgType>::AtomicT *state) {
+ LOG(FATAL) << "Not implemented";
+ }
+
+ template <typename ArgType>
+ inline static void FinalizeAtomic(const typename AggState<ArgType>::AtomicT &state,
+ typename AggState<ArgType>::ResultT *result) {
+ LOG(FATAL) << "Not implemented";
+ }
+
+ template <typename ArgType>
+ inline static void MergeArgUnsafe(const typename ArgType::cpptype &value,
+ typename AggState<ArgType>::T *state) {
+ *state += value;
+ }
+
+ template <typename ArgType>
+ inline static void FinalizeUnsafe(const typename AggState<ArgType>::T &state,
+ typename AggState<ArgType>::ResultT *result) {
+ *result = state;
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Sum);
+};
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for IntType
+template <>
+struct Sum::AggState<IntType> {
+ typedef std::int64_t T;
+ typedef std::atomic<std::int64_t> AtomicT;
+ typedef std::int64_t ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<IntType>(const IntType::cpptype &value,
+ AggState<IntType>::AtomicT *state) {
+ state->fetch_add(value, std::memory_order_relaxed);
+}
+
+template <>
+inline void Sum::FinalizeAtomic<IntType>(const AggState<IntType>::AtomicT &state,
+ AggState<IntType>::ResultT *result) {
+ *result = state.load(std::memory_order_relaxed);
+}
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for LongType
+template <>
+struct Sum::AggState<LongType> {
+ typedef std::int64_t T;
+ typedef std::atomic<std::int64_t> AtomicT;
+ typedef std::int64_t ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<LongType>(const LongType::cpptype &value,
+ AggState<LongType>::AtomicT *state) {
+ state->fetch_add(value, std::memory_order_relaxed);
+}
+
+template <>
+inline void Sum::FinalizeAtomic<LongType>(const AggState<LongType>::AtomicT &state,
+ AggState<LongType>::ResultT *result) {
+ *result = state.load(std::memory_order_relaxed);
+}
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for FloatType
+template <>
+struct Sum::AggState<FloatType> {
+ typedef double T;
+ typedef std::atomic<double> AtomicT;
+ typedef double ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<FloatType>(const FloatType::cpptype &value,
+ AggState<FloatType>::AtomicT *state) {
+ AggState<FloatType>::T state_val = state->load(std::memory_order_relaxed);
+ while (!state->compare_exchange_weak(state_val, state_val + value)) {}
+}
+
+template <>
+inline void Sum::FinalizeAtomic<FloatType>(const AggState<FloatType>::AtomicT &state,
+ AggState<FloatType>::ResultT *result) {
+ *result = state.load(std::memory_order_relaxed);
+}
+
+//------------------------------------------------------------------------------
+// Implementation of Sum for DoubleType
+template <>
+struct Sum::AggState<DoubleType> {
+ typedef double T;
+ typedef std::atomic<double> AtomicT;
+ typedef double ResultT;
+};
+
+template <>
+inline void Sum::MergeArgAtomic<DoubleType>(const DoubleType::cpptype &value,
+ AggState<DoubleType>::AtomicT *state) {
+ AggState<DoubleType>::T state_val = state->load(std::memory_order_relaxed);
+ while (!state->compare_exchange_weak(state_val, state_val + value)) {}
+}
+
+template <>
+inline void Sum::FinalizeAtomic<DoubleType>(const AggState<DoubleType>::AtomicT &state,
+ AggState<DoubleType>::ResultT *result) {
+ *result = state.load(std::memory_order_relaxed);
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 4220a8d..c0ebad7 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(expressions_aggregation_AggregateFunction_proto_srcs
AggregateFunction.proto)
# Declare micro-libs:
+add_library(quickstep_expressions_aggregation_AggFunc ../../empty_src.cpp AggFunc.hpp)
add_library(quickstep_expressions_aggregation_AggregateFunction
AggregateFunction.cpp
AggregateFunction.hpp)
@@ -69,6 +70,14 @@ add_library(quickstep_expressions_aggregation_AggregationID
AggregationID.hpp)
# Link dependencies:
+target_link_libraries(quickstep_expressions_aggregation_AggFunc
+ glog
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_types_DoubleType
+ quickstep_types_FloatType
+ quickstep_types_IntType
+ quickstep_types_LongType
+ quickstep_utility_Macros)
target_link_libraries(quickstep_expressions_aggregation_AggregateFunction
glog
quickstep_expressions_aggregation_AggregateFunction_proto
@@ -236,6 +245,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
# Submodule all-in-one library:
add_library(quickstep_expressions_aggregation ../../empty_src.cpp)
target_link_libraries(quickstep_expressions_aggregation
+ quickstep_expressions_aggregation_AggFunc
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunction_proto
quickstep_expressions_aggregation_AggregateFunctionAvg
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 70b69e0..19fc322 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -163,6 +163,8 @@ static const volatile bool aggregate_hashtable_type_dummy
DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
+DEFINE_bool(use_collision_free_agg, true, "");
+
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
namespace S = ::quickstep::serialization;
@@ -1508,7 +1510,8 @@ void ExecutionGenerator::convertAggregate(
cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
std::size_t max_num_groups;
- if (cost_model_for_aggregation_
+ if (FLAGS_use_collision_free_agg &&
+ cost_model_for_aggregation_
->canUseCollisionFreeAggregation(physical_plan,
estimated_num_groups,
&max_num_groups)) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f39b41..00bb433 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -67,7 +67,7 @@ DEFINE_int32(num_aggregation_partitions,
41,
"The number of partitions used for performing the aggregation");
DEFINE_int32(partition_aggregation_num_groups_threshold,
- 500000,
+ 100,
"The threshold used for deciding whether the aggregation is done "
"in a partitioned way or not");
@@ -208,13 +208,13 @@ AggregationOperationState::AggregationOperationState(
group_by_handles,
storage_manager));
} else if (is_aggregate_partitioned_) {
- partitioned_group_by_hashtable_pool_.reset(
- new PartitionedHashTablePool(estimated_num_entries,
- FLAGS_num_aggregation_partitions,
- hash_table_impl_type,
- group_by_types_,
- group_by_handles,
- storage_manager));
+ partitioned_hashtable_.reset(
+ AggregationStateHashTableFactory::CreateResizable(
+ hash_table_impl_type,
+ group_by_types_,
+ estimated_num_entries,
+ group_by_handles,
+ storage_manager_));
} else {
group_by_hashtable_pool_.reset(
new HashTablePool(estimated_num_entries,
@@ -406,7 +406,8 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
return static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->getNumFinalizationPartitions();
} else if (is_aggregate_partitioned_) {
- return partitioned_group_by_hashtable_pool_->getNumPartitions();
+ return static_cast<PackedPayloadHashTable *>(
+ partitioned_hashtable_.get())->getNumFinalizationPartitions();
} else {
return 1u;
}
@@ -549,62 +550,11 @@ void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
const ValueAccessorMultiplexer &accessor_mux) {
- DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
-
- std::vector<attribute_id> group_by_key_ids;
- for (const MultiSourceAttributeId &key_id : group_by_key_ids_) {
- DCHECK(key_id.source == ValueAccessorSource::kBase);
- group_by_key_ids.emplace_back(key_id.attr_id);
- }
+ DCHECK(partitioned_hashtable_ != nullptr);
- InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
- accessor_mux.getBaseAccessor(),
- [&](auto *accessor) -> void { // NOLINT(build/c++11)
- // TODO(jianqiao): handle the situation when keys in non_trivial_results
- const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
-
- // Compute the partitions for the tuple formed by group by values.
- std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
- partition_membership.resize(num_partitions);
-
- // Create a tuple-id sequence for each partition.
- for (std::size_t partition = 0; partition < num_partitions; ++partition) {
- partition_membership[partition].reset(
- new TupleIdSequence(accessor->getEndPosition()));
- }
-
- // Iterate over ValueAccessor for each tuple,
- // set a bit in the appropriate TupleIdSequence.
- while (accessor->next()) {
- // We need a unique_ptr because getTupleWithAttributes() uses "new".
- std::unique_ptr<Tuple> curr_tuple(
- accessor->getTupleWithAttributes(group_by_key_ids));
- const std::size_t curr_tuple_partition_id =
- curr_tuple->getTupleHash() % num_partitions;
- partition_membership[curr_tuple_partition_id]->set(
- accessor->getCurrentPosition(), true);
- }
-
- // Aggregate each partition.
- for (std::size_t partition = 0; partition < num_partitions; ++partition) {
- std::unique_ptr<ValueAccessor> base_adapter(
- accessor->createSharedTupleIdSequenceAdapter(
- *partition_membership[partition]));
-
- std::unique_ptr<ValueAccessor> derived_adapter;
- if (accessor_mux.getDerivedAccessor() != nullptr) {
- derived_adapter.reset(
- accessor_mux.getDerivedAccessor()->createSharedTupleIdSequenceAdapterVirtual(
- *partition_membership[partition]));
- }
-
- ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get());
- partitioned_group_by_hashtable_pool_->getHashTable(partition)
- ->upsertValueAccessorCompositeKey(argument_ids_,
- group_by_key_ids_,
- local_mux);
- }
- });
+ partitioned_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
+ group_by_key_ids_,
+ accessor_mux);
}
void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
@@ -712,20 +662,18 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
const std::size_t partition_id,
InsertDestination *output_destination) {
PackedPayloadHashTable *hash_table =
- static_cast<PackedPayloadHashTable *>(
- partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
+ static_cast<PackedPayloadHashTable *>(partitioned_hashtable_.get());
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
if (handles_.empty()) {
- const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
- const std::uint8_t *dumb_placeholder) -> void {
+ hash_table->forEachCompositeKeyInPartition(
+ partition_id,
+ [&](std::vector<TypedValue> &group_by_key) -> void {
group_by_keys.emplace_back(std::move(group_by_key));
- };
-
- hash_table->forEachCompositeKey(&keys_retriever);
+ });
}
// Collect per-aggregate finalized values.
@@ -737,15 +685,8 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
final_values.emplace_back(agg_result_col);
}
}
- hash_table->destroyPayload();
+// hash_table->destroyPayload();
- // Reorganize 'group_by_keys' in column-major order so that we can make a
- // ColumnVectorsValueAccessor to bulk-insert results.
- //
- // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
- // if there is only one aggregate. The need to do this should hopefully go
- // away when we work out storing composite structures for multiple aggregates
- // in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
for (const Type *group_by_type : group_by_types_) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c8930ee..a75f243 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -296,7 +296,7 @@ class AggregationOperationState {
// A vector of group by hash table pools.
std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
- std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+ std::unique_ptr<AggregationStateHashTableBase> partitioned_hashtable_;
std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 293be17..fcc069b 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -436,7 +436,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
endif()
# CMAKE_VALIDATE_IGNORE_END
target_link_libraries(quickstep_storage_CollisionFreeVectorTable
+ ${GFLAGS_LIB_NAME}
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggFunc
quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
@@ -447,11 +449,12 @@ target_link_libraries(quickstep_storage_CollisionFreeVectorTable
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorMultiplexer
quickstep_storage_ValueAccessorUtil
+ quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeID
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
- quickstep_utility_BarrieredReadWriteConcurrentBitVector
+ quickstep_utility_BoolVector
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_ColumnStoreUtil
quickstep_catalog_CatalogAttribute
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index d836014..c92f0ab 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -24,6 +24,7 @@
#include <cstdint>
#include <cstdlib>
#include <memory>
+#include <type_traits>
#include <vector>
#include "expressions/aggregation/AggregationHandle.hpp"
@@ -33,13 +34,175 @@
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
#include "storage/ValueAccessorUtil.hpp"
+#include "threading/SpinMutex.hpp"
+#include "types/TypeID.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+#include "utility/BoolVector.hpp"
#include "glog/logging.h"
namespace quickstep {
+DEFINE_uint64(vt_threadprivate_threshold, 1000000L, "");
+DEFINE_bool(use_latch, false, "");
+
+namespace {
+
+template <typename T>
+using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>;
+
+template <typename FunctorT>
+inline auto InvokeOnKeyType(const Type &type,
+ const FunctorT &functor) {
+ switch (type.getTypeID()) {
+ case TypeID::kInt:
+ return functor(static_cast<const IntType&>(type));
+ case TypeID::kLong:
+ return functor(static_cast<const LongType&>(type));
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnType(const Type &type,
+ const FunctorT &functor) {
+ switch (type.getTypeID()) {
+ case TypeID::kInt:
+ return functor(static_cast<const IntType&>(type));
+ case TypeID::kLong:
+ return functor(static_cast<const LongType&>(type));
+ case TypeID::kFloat:
+ return functor(static_cast<const FloatType&>(type));
+ case TypeID::kDouble:
+ return functor(static_cast<const DoubleType&>(type));
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBool(const bool &val,
+ const FunctorT &functor) {
+ if (val) {
+ return functor(std::true_type());
+ } else {
+ return functor(std::false_type());
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBools(const bool &val1,
+ const bool &val2,
+ const FunctorT &functor) {
+ if (val1) {
+ if (val2) {
+ return functor(std::true_type(), std::true_type());
+ } else {
+ return functor(std::true_type(), std::false_type());
+ }
+ } else {
+ if (val2) {
+ return functor(std::false_type(), std::true_type());
+ } else {
+ return functor(std::false_type(), std::false_type());
+ }
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnAggFunc(const AggregationID &agg_id,
+ const FunctorT &functor) {
+ switch (agg_id) {
+ case AggregationID::kSum: {
+ return functor(Sum());
+ }
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeIf(const std::true_type &val,
+ const FunctorT &functor) {
+ return functor();
+}
+
+template <typename FunctorT>
+inline void InvokeIf(const std::false_type &val,
+ const FunctorT &functor) {
+}
+
+//template <typename FunctorT>
+//inline void InvokeOnAggFuncIfApplicableToArgType(
+// const AggregationID &agg_id,
+// const Type &arg_type,
+// const FunctorT &functor) {
+// InvokeOnAggFunc(
+// agg_id,
+// [&](const auto &agg_func) -> void {
+// InvokeOnType(
+// arg_type,
+// [&](const auto &arg_type) -> void {
+// using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+// using ArgT = remove_const_reference_t<decltype(arg_type)>;
+//
+// InvokeIf(
+// typename AggFuncT::template HasAtomicImpl<ArgT>(),
+// [&]() -> void {
+// functor(agg_func, arg_type);
+// });
+// });
+// });
+//}
+
+template <typename FunctorT>
+inline void InvokeOnAggFuncWithArgType(
+ const AggregationID &agg_id,
+ const Type &arg_type,
+ const FunctorT &functor) {
+ InvokeOnAggFunc(
+ agg_id,
+ [&](const auto &agg_func) -> void {
+ InvokeOnType(
+ arg_type,
+ [&](const auto &arg_type) -> void {
+ functor(agg_func, arg_type);
+ });
+ });
+}
+
+template <typename FunctorT>
+inline auto InvokeOnTwoAccessors(
+ const ValueAccessorMultiplexer &accessor_mux,
+ const ValueAccessorSource &first_source,
+ const ValueAccessorSource &second_source,
+ const FunctorT &functor) {
+ ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+ ColumnVectorsValueAccessor *derived_accessor =
+ static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+
+ InvokeOnAnyValueAccessor(
+ base_accessor,
+ [&](auto *accessor) {
+ if (first_source == ValueAccessorSource::kBase) {
+ if (second_source == ValueAccessorSource::kBase) {
+ return functor(std::false_type(), accessor, accessor);
+ } else {
+ return functor(std::true_type(), accessor, derived_accessor);
+ }
+ } else {
+ if (second_source == ValueAccessorSource::kBase) {
+ return functor(std::true_type(), derived_accessor, accessor);
+ } else {
+ return functor(std::false_type(), derived_accessor, derived_accessor);
+ }
+ }
+ });
+}
+
+} // namespace
+
CollisionFreeVectorTable::CollisionFreeVectorTable(
const Type *key_type,
const std::size_t num_entries,
@@ -49,46 +212,45 @@ CollisionFreeVectorTable::CollisionFreeVectorTable(
num_entries_(num_entries),
num_handles_(handles.size()),
handles_(handles),
+ use_thread_private_existence_map_(num_entries_ < FLAGS_vt_threadprivate_threshold),
num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
storage_manager_(storage_manager) {
DCHECK_GT(num_entries, 0u);
std::size_t required_memory = 0;
const std::size_t existence_map_offset = 0;
+ std::size_t mutex_vec_offset = 0;
std::vector<std::size_t> state_offsets;
- required_memory += CacheLineAlignedBytes(
- BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+ if (!use_thread_private_existence_map_) {
+ required_memory += CacheLineAlignedBytes(
+ BarrieredReadWriteConcurrentBoolVector::BytesNeeded(num_entries));
+ }
+
+ if (FLAGS_use_latch) {
+ mutex_vec_offset = required_memory;
+ required_memory += CacheLineAlignedBytes(num_entries * sizeof(SpinMutex));
+ }
for (std::size_t i = 0; i < num_handles_; ++i) {
const AggregationHandle *handle = handles_[i];
const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+ DCHECK_EQ(1u, argument_types.size());
std::size_t state_size = 0;
- switch (handle->getAggregationID()) {
- case AggregationID::kCount: {
- state_size = sizeof(std::atomic<std::size_t>);
- break;
- }
- case AggregationID::kSum: {
- DCHECK_EQ(1u, argument_types.size());
- switch (argument_types.front()->getTypeID()) {
- case TypeID::kInt: // Fall through
- case TypeID::kLong:
- state_size = sizeof(std::atomic<std::int64_t>);
- break;
- case TypeID::kFloat: // Fall through
- case TypeID::kDouble:
- state_size = sizeof(std::atomic<double>);
- break;
- default:
- LOG(FATAL) << "Not implemented";
- }
- break;
+ InvokeOnAggFuncWithArgType(
+ handle->getAggregationID(),
+ *argument_types.front(),
+ [&](const auto &agg_func, const auto &arg_type) {
+ using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+ using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+ if (FLAGS_use_latch) {
+ state_size = sizeof(typename AggFuncT::template AggState<ArgT>::T);
+ } else {
+ state_size = sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT);
}
- default:
- LOG(FATAL) << "Not implemented";
- }
+ });
state_offsets.emplace_back(required_memory);
required_memory += CacheLineAlignedBytes(state_size * num_entries);
@@ -101,10 +263,21 @@ CollisionFreeVectorTable::CollisionFreeVectorTable(
blob_ = storage_manager_->getBlobMutable(blob_id);
void *memory_start = blob_->getMemoryMutable();
- existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
- reinterpret_cast<char *>(memory_start) + existence_map_offset,
- num_entries,
- false /* initialize */));
+ if (use_thread_private_existence_map_) {
+ thread_private_existence_map_pool_.reset(new BoolVectorPool(num_entries));
+ } else {
+ concurrent_existence_map_.reset(new BarrieredReadWriteConcurrentBoolVector(
+ reinterpret_cast<char *>(memory_start) + existence_map_offset,
+ num_entries,
+ false /* initialize */));
+ }
+
+ if (FLAGS_use_latch) {
+ mutex_vec_ = reinterpret_cast<SpinMutex *>(
+ reinterpret_cast<char *>(memory_start) + mutex_vec_offset);
+ } else {
+ mutex_vec_ = nullptr;
+ }
for (std::size_t i = 0; i < num_handles_; ++i) {
// Columnwise layout.
@@ -132,113 +305,103 @@ bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey(
DCHECK_EQ(1u, key_ids.size());
if (handles_.empty()) {
- InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
- accessor_mux.getValueAccessorBySource(key_ids.front().source),
- [&key_ids, this](auto *accessor) -> void { // NOLINT(build/c++11)
- this->upsertValueAccessorKeyOnlyHelper(key_type_->isNullable(),
- key_type_,
- key_ids.front().attr_id,
- accessor);
- });
- return true;
+ LOG(FATAL) << "Not implemented";
}
- DCHECK(accessor_mux.getDerivedAccessor() == nullptr ||
- accessor_mux.getDerivedAccessor()->getImplementationType()
- == ValueAccessor::Implementation::kColumnVectors);
+ const ValueAccessorSource key_source = key_ids.front().source;
+ const attribute_id key_id = key_ids.front().attr_id;
+ const bool is_key_nullable = key_type_->isNullable();
- ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
- ColumnVectorsValueAccessor *derived_accesor =
- static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ DCHECK_LE(argument_ids[i].size(), 1u);
- // Dispatch to specialized implementations to achieve maximum performance.
- InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
- base_accessor,
- [&argument_ids, &key_ids, &derived_accesor, this](auto *accessor) -> void { // NOLINT(build/c++11)
- const ValueAccessorSource key_source = key_ids.front().source;
- const attribute_id key_id = key_ids.front().attr_id;
- const bool is_key_nullable = key_type_->isNullable();
-
- for (std::size_t i = 0; i < num_handles_; ++i) {
- DCHECK_LE(argument_ids[i].size(), 1u);
-
- const AggregationHandle *handle = handles_[i];
- const auto &argument_types = handle->getArgumentTypes();
- const auto &argument_ids_i = argument_ids[i];
-
- ValueAccessorSource argument_source;
- attribute_id argument_id;
- const Type *argument_type;
- bool is_argument_nullable;
-
- if (argument_ids_i.empty()) {
- argument_source = ValueAccessorSource::kInvalid;
- argument_id = kInvalidAttributeID;
-
- DCHECK(argument_types.empty());
- argument_type = nullptr;
- is_argument_nullable = false;
- } else {
- DCHECK_EQ(1u, argument_ids_i.size());
- argument_source = argument_ids_i.front().source;
- argument_id = argument_ids_i.front().attr_id;
+ const AggregationHandle *handle = handles_[i];
+ const auto &argument_types = handle->getArgumentTypes();
+ const auto &argument_ids_i = argument_ids[i];
+
+ ValueAccessorSource argument_source;
+ attribute_id argument_id;
+ const Type *argument_type;
+ bool is_argument_nullable;
+
+ if (argument_ids_i.empty()) {
+// argument_source = ValueAccessorSource::kInvalid;
+// argument_id = kInvalidAttributeID;
+//
+// DCHECK(argument_types.empty());
+// argument_type = nullptr;
+// is_argument_nullable = false;
+ LOG(FATAL) << "Not supported";
+ } else {
+ DCHECK_EQ(1u, argument_ids_i.size());
+ argument_source = argument_ids_i.front().source;
+ argument_id = argument_ids_i.front().attr_id;
+
+ DCHECK_EQ(1u, argument_types.size());
+ argument_type = argument_types.front();
+ is_argument_nullable = argument_type->isNullable();
+ }
- DCHECK_EQ(1u, argument_types.size());
- argument_type = argument_types.front();
- is_argument_nullable = argument_type->isNullable();
- }
+ InvokeOnAggFuncWithArgType(
+ handle->getAggregationID(),
+ *argument_types.front(),
+ [&](const auto &agg_func, const auto &arg_type) {
+ using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+ using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+ InvokeOnKeyType(
+ *key_type_,
+ [&](const auto &key_type) -> void {
+ using KeyT = remove_const_reference_t<decltype(key_type)>;
+
+ InvokeOnBools(
+ is_key_nullable,
+ is_argument_nullable,
+ [&](const auto &is_key_nullable,
+ const auto &is_argument_nullable) -> void {
+ using KeyNullableT =
+ remove_const_reference_t<decltype(is_key_nullable)>;
+ using ArgNullableT =
+ remove_const_reference_t<decltype(is_argument_nullable)>;
+
+ InvokeOnTwoAccessors(
+ accessor_mux,
+ key_source,
+ argument_source,
+ [&](const auto &use_two_accessors,
+ auto *key_accessor,
+ auto *argument_accessor) {
+ using UseTwoAccessorsT =
+ remove_const_reference_t<decltype(use_two_accessors)>;
+
+ invokeOnExistenceMap(
+ [&](auto *existence_map) -> void {
+ if (FLAGS_use_latch) {
+ upsertValueAccessorInternalUnaryLatch<
+ AggFuncT, KeyT, ArgT,
+ KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>( key_id,
+ argument_id,
+ vec_tables_[i],
+ existence_map,
+ key_accessor,
+ argument_accessor);
+ } else {
+ upsertValueAccessorInternalUnaryAtomic<
+ AggFuncT, KeyT, ArgT,
+ KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>( key_id,
+ argument_id,
+ vec_tables_[i],
+ existence_map,
+ key_accessor,
+ argument_accessor);
+ }
+ });
+ });
+ });
+ });
+ });
+ }
- if (key_source == ValueAccessorSource::kBase) {
- if (argument_source == ValueAccessorSource::kBase) {
- this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- accessor,
- accessor);
- } else {
- this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- accessor,
- derived_accesor);
- }
- } else {
- if (argument_source == ValueAccessorSource::kBase) {
- this->upsertValueAccessorDispatchHelper<true>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- derived_accesor,
- accessor);
- } else {
- this->upsertValueAccessorDispatchHelper<false>(is_key_nullable,
- is_argument_nullable,
- key_type_,
- argument_type,
- handle->getAggregationID(),
- key_id,
- argument_id,
- vec_tables_[i],
- derived_accesor,
- derived_accesor);
- }
- }
- }
- });
return true;
}
@@ -249,16 +412,17 @@ void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
const std::size_t end_position =
calculatePartitionEndPosition(partition_id);
- switch (key_type_->getTypeID()) {
- case TypeID::kInt:
- finalizeKeyInternal<int>(start_position, end_position, output_cv);
- return;
- case TypeID::kLong:
- finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
+ InvokeOnKeyType(
+ *key_type_,
+ [&](const auto &key_type) {
+ using KeyT = remove_const_reference_t<decltype(key_type)>;
+
+ invokeOnExistenceMapFinal(
+ [&](const auto *existence_map) -> void {
+ finalizeKeyInternal<typename KeyT::cpptype>(
+ start_position, end_position, existence_map, output_cv);
+ });
+ });
}
void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
@@ -274,12 +438,32 @@ void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
const Type *argument_type =
argument_types.empty() ? nullptr : argument_types.front();
- finalizeStateDispatchHelper(handle->getAggregationID(),
- argument_type,
- vec_tables_[handle_id],
- start_position,
- end_position,
- output_cv);
+ DCHECK(argument_type != nullptr);
+
+ InvokeOnAggFuncWithArgType(
+ handle->getAggregationID(),
+ *argument_type,
+ [&](const auto &agg_func, const auto &arg_type) {
+ using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+ using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+ invokeOnExistenceMapFinal(
+ [&](const auto *existence_map) -> void {
+ if (FLAGS_use_latch) {
+ finalizeStateInternalLatch<AggFuncT, ArgT>(start_position,
+ end_position,
+ vec_tables_[handle_id],
+ existence_map,
+ output_cv);
+ } else {
+ finalizeStateInternalAtomic<AggFuncT, ArgT>(start_position,
+ end_position,
+ vec_tables_[handle_id],
+ existence_map,
+ output_cv);
+ }
+ });
+ });
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 772d47d..79020fb 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -29,22 +29,27 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageConstants.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
+#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypeID.hpp"
#include "types/containers/ColumnVector.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+#include "utility/BoolVector.hpp"
#include "utility/Macros.hpp"
+#include "gflags/gflags.h"
+
#include "glog/logging.h"
namespace quickstep {
class AggregationHandle;
+class BarrieredReadWriteConcurrentBitVector;
class StorageManager;
/** \addtogroup Storage
@@ -101,7 +106,17 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
calculatePartitionStartPosition(partition_id);
const std::size_t end_position =
calculatePartitionEndPosition(partition_id);
- return existence_map_->onesCountInRange(start_position, end_position);
+
+ if (use_thread_private_existence_map_) {
+ auto &bool_vectors = thread_private_existence_map_pool_->getAll();
+ auto &target_bv = bool_vectors.front();
+ for (std::size_t i = 1; i < bool_vectors.size(); ++i) {
+ target_bv->unionWith(*bool_vectors[i], start_position, end_position);
+ }
+ return target_bv->onesCountInRange(start_position, end_position);
+ } else {
+ return concurrent_existence_map_->onesCountInRange(start_position, end_position);
+ }
}
/**
@@ -110,7 +125,7 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
* @return The existence map for this vector table.
*/
inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
- return existence_map_.get();
+ return nullptr;
}
/**
@@ -214,115 +229,67 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
num_entries_);
}
- template <bool use_two_accessors, typename ...ArgTypes>
- inline void upsertValueAccessorDispatchHelper(
- const bool is_key_nullable,
- const bool is_argument_nullable,
- ArgTypes &&...args);
+ template <typename FunctorT>
+ inline void invokeOnExistenceMap(const FunctorT &functor) {
+ if (use_thread_private_existence_map_) {
+ BoolVector *existence_map = thread_private_existence_map_pool_->checkOut();
+ functor(existence_map);
+ thread_private_existence_map_pool_->checkIn(existence_map);
+ } else {
+ functor(concurrent_existence_map_.get());
+ }
+ }
- template <bool ...bool_values, typename ...ArgTypes>
- inline void upsertValueAccessorDispatchHelper(
- const Type *key_type,
- ArgTypes &&...args);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ...ArgTypes>
- inline void upsertValueAccessorDispatchHelper(
- const Type *argument_type,
- const AggregationID agg_id,
- ArgTypes &&...args);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorCountHelper(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorSumHelper(
- const Type *argument_type,
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <typename ...ArgTypes>
- inline void upsertValueAccessorKeyOnlyHelper(
- const bool is_key_nullable,
- const Type *key_type,
- ArgTypes &&...args);
-
- template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
- inline void upsertValueAccessorKeyOnly(
- const attribute_id key_attr_id,
- KeyValueAccessorT *key_accessor);
-
- template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
- inline void upsertValueAccessorCountNullary(
- const attribute_id key_attr_id,
- std::atomic<std::size_t> *vec_table,
- KeyValueAccessorT *key_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorCountUnary(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<std::size_t> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorIntegerSum(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
- inline void upsertValueAccessorGenericSum(
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor);
-
- template <typename KeyT>
+ template <typename FunctorT>
+ inline void invokeOnExistenceMapFinal(const FunctorT &functor) const {
+ if (use_thread_private_existence_map_) {
+ const BoolVector *existence_map =
+ thread_private_existence_map_pool_->getAll().front().get();
+ functor(existence_map);
+ } else {
+ functor(concurrent_existence_map_.get());
+ }
+ }
+
+ template <typename AggFuncT, typename KeyT, typename ArgT,
+ bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+ typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
+ inline void upsertValueAccessorInternalUnaryAtomic(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ BoolVectorT *existence_map,
+ KeyAccessorT *key_accessor,
+ ArgAccessorT *argument_accessor);
+
+ template <typename AggFuncT, typename KeyT, typename ArgT,
+ bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+ typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
+ inline void upsertValueAccessorInternalUnaryLatch(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ BoolVectorT *existence_map,
+ KeyAccessorT *key_accessor,
+ ArgAccessorT *argument_accessor);
+
+ template <typename KeyT, typename BoolVectorT>
inline void finalizeKeyInternal(const std::size_t start_position,
const std::size_t end_position,
+ BoolVectorT *existence_map,
NativeColumnVector *output_cv) const;
- template <typename ...ArgTypes>
- inline void finalizeStateDispatchHelper(const AggregationID agg_id,
- const Type *argument_type,
+ template <typename AggFuncT, typename ArgT, typename BoolVectorT>
+ inline void finalizeStateInternalAtomic(const std::size_t start_position,
+ const std::size_t end_position,
const void *vec_table,
- ArgTypes &&...args) const;
-
- template <typename ...ArgTypes>
- inline void finalizeStateSumHelper(const Type *argument_type,
- const void *vec_table,
- ArgTypes &&...args) const;
-
- inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
- const std::size_t start_position,
- const std::size_t end_position,
- NativeColumnVector *output_cv) const;
+ BoolVectorT *existence_map,
+ NativeColumnVector *output_cv) const;
- template <typename ResultT, typename StateT>
- inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
- const std::size_t start_position,
- const std::size_t end_position,
- NativeColumnVector *output_cv) const;
+ template <typename AggFuncT, typename ArgT, typename BoolVectorT>
+ inline void finalizeStateInternalLatch(const std::size_t start_position,
+ const std::size_t end_position,
+ const void *vec_table,
+ BoolVectorT *existence_map,
+ NativeColumnVector *output_cv) const;
const Type *key_type_;
const std::size_t num_entries_;
@@ -330,8 +297,12 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
const std::size_t num_handles_;
const std::vector<AggregationHandle *> handles_;
- std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
+ const bool use_thread_private_existence_map_;
+ std::unique_ptr<BarrieredReadWriteConcurrentBoolVector> concurrent_existence_map_;
+ std::unique_ptr<BoolVectorPool> thread_private_existence_map_pool_;
+
std::vector<void *> vec_tables_;
+ SpinMutex *mutex_vec_;
const std::size_t num_finalize_partitions_;
@@ -347,392 +318,144 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
// ----------------------------------------------------------------------------
// Implementations of template methods follow.
-template <bool use_two_accessors, typename ...ArgTypes>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorDispatchHelper(const bool is_key_nullable,
- const bool is_argument_nullable,
- ArgTypes &&...args) {
- if (is_key_nullable) {
- if (is_argument_nullable) {
- upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
- std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
- std::forward<ArgTypes>(args)...);
- }
- } else {
- if (is_argument_nullable) {
- upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
- std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
- std::forward<ArgTypes>(args)...);
- }
- }
-}
-
-template <bool ...bool_values, typename ...ArgTypes>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorDispatchHelper(const Type *key_type,
- ArgTypes &&...args) {
- switch (key_type->getTypeID()) {
- case TypeID::kInt:
- upsertValueAccessorDispatchHelper<bool_values..., int>(
- std::forward<ArgTypes>(args)...);
- return;
- case TypeID::kLong:
- upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
- std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ...ArgTypes>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorDispatchHelper(const Type *argument_type,
- const AggregationID agg_id,
- ArgTypes &&...args) {
- switch (agg_id) {
- case AggregationID::kCount:
- upsertValueAccessorCountHelper<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
- std::forward<ArgTypes>(args)...);
- return;
- case AggregationID::kSum:
- upsertValueAccessorSumHelper<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
- argument_type, std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <typename ...ArgTypes>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable,
- const Type *key_type,
- ArgTypes &&...args) {
- switch (key_type->getTypeID()) {
- case TypeID::kInt: {
- if (is_key_nullable) {
- upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...);
- }
- return;
- }
- case TypeID::kLong: {
- if (is_key_nullable) {
- upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...);
- } else {
- upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...);
- }
- return;
- }
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id,
- ValueAccessorT *accessor) {
- accessor->beginIteration();
- while (accessor->next()) {
- const KeyT *key = static_cast<const KeyT *>(
- accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- existence_map_->setBit(*key);
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorCountHelper(const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- DCHECK_GE(key_attr_id, 0);
-
- if (is_argument_nullable && argument_id != kInvalidAttributeID) {
- upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<std::size_t> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- } else {
- upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
- key_attr_id,
- static_cast<std::atomic<std::size_t> *>(vec_table),
- key_accessor);
- return;
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+template <typename AggFuncT, typename KeyT, typename ArgT,
+ bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+ typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
inline void CollisionFreeVectorTable
- ::upsertValueAccessorSumHelper(const Type *argument_type,
- const attribute_id key_attr_id,
- const attribute_id argument_id,
- void *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- DCHECK_GE(key_attr_id, 0);
- DCHECK_GE(argument_id, 0);
- DCHECK(argument_type != nullptr);
-
- switch (argument_type->getTypeID()) {
- case TypeID::kInt:
- upsertValueAccessorIntegerSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<std::int64_t> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- case TypeID::kLong:
- upsertValueAccessorIntegerSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<std::int64_t> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- case TypeID::kFloat:
- upsertValueAccessorGenericSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<double> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- case TypeID::kDouble:
- upsertValueAccessorGenericSum<
- use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>(
- key_attr_id,
- argument_id,
- static_cast<std::atomic<double> *>(vec_table),
- key_accessor,
- argument_accessor);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
+ ::upsertValueAccessorInternalUnaryAtomic(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ BoolVectorT *existence_map,
+ KeyAccessorT *key_accessor,
+ ArgAccessorT *argument_accessor) {
+ auto *states = static_cast<
+ typename AggFuncT::template AggState<ArgT>::AtomicT *>(vec_table);
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorCountNullary(const attribute_id key_attr_id,
- std::atomic<std::size_t> *vec_table,
- ValueAccessorT *accessor) {
- accessor->beginIteration();
- while (accessor->next()) {
- const KeyT *key = static_cast<const KeyT *>(
- accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- const std::size_t loc = *key;
- vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
- existence_map_->setBit(loc);
- }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorCountUnary(const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<std::size_t> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
key_accessor->beginIteration();
if (use_two_accessors) {
argument_accessor->beginIteration();
}
- while (key_accessor->next()) {
- if (use_two_accessors) {
- argument_accessor->next();
- }
- const KeyT *key = static_cast<const KeyT *>(
- key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
- if (is_key_nullable && key == nullptr) {
- continue;
- }
- const std::size_t loc = *key;
- existence_map_->setBit(loc);
- if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
- continue;
- }
- vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
- }
-}
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeVectorTable
- ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
- key_accessor->beginIteration();
- if (use_two_accessors) {
- argument_accessor->beginIteration();
- }
while (key_accessor->next()) {
if (use_two_accessors) {
argument_accessor->next();
}
- const KeyT *key = static_cast<const KeyT *>(
+
+ const auto *key = static_cast<const typename KeyT::cpptype *>(
key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
if (is_key_nullable && key == nullptr) {
continue;
}
const std::size_t loc = *key;
- existence_map_->setBit(loc);
- const ArgumentT *argument = static_cast<const ArgumentT *>(
+ existence_map->set(loc);
+
+ const auto *argument = static_cast<const typename ArgT::cpptype *>(
argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
if (is_argument_nullable && argument == nullptr) {
continue;
}
- vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
+
+ AggFuncT::template MergeArgAtomic<ArgT>(*argument, states + loc);
}
}
-template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable,
- typename KeyT, typename ArgumentT, typename StateT,
- typename KeyValueAccessorT, typename ArgumentValueAccessorT>
+template <typename AggFuncT, typename KeyT, typename ArgT,
+ bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors,
+ typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT>
inline void CollisionFreeVectorTable
- ::upsertValueAccessorGenericSum(const attribute_id key_attr_id,
- const attribute_id argument_id,
- std::atomic<StateT> *vec_table,
- KeyValueAccessorT *key_accessor,
- ArgumentValueAccessorT *argument_accessor) {
+ ::upsertValueAccessorInternalUnaryLatch(const attribute_id key_attr_id,
+ const attribute_id argument_id,
+ void *vec_table,
+ BoolVectorT *existence_map,
+ KeyAccessorT *key_accessor,
+ ArgAccessorT *argument_accessor) {
+ auto *states = static_cast<
+ typename AggFuncT::template AggState<ArgT>::T *>(vec_table);
+
key_accessor->beginIteration();
if (use_two_accessors) {
argument_accessor->beginIteration();
}
+
while (key_accessor->next()) {
if (use_two_accessors) {
argument_accessor->next();
}
- const KeyT *key = static_cast<const KeyT *>(
+
+ const auto *key = static_cast<const typename KeyT::cpptype *>(
key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
if (is_key_nullable && key == nullptr) {
continue;
}
const std::size_t loc = *key;
- existence_map_->setBit(loc);
- const ArgumentT *argument = static_cast<const ArgumentT *>(
+ existence_map->set(loc);
+
+ const auto *argument = static_cast<const typename ArgT::cpptype *>(
argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id));
if (is_argument_nullable && argument == nullptr) {
continue;
}
- const ArgumentT arg_val = *argument;
- std::atomic<StateT> &state = vec_table[loc];
- StateT state_val = state.load(std::memory_order_relaxed);
- while (!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
+
+ SpinMutexLock lock(mutex_vec_[loc]);
+ AggFuncT::template MergeArgUnsafe<ArgT>(*argument, states + loc);
}
}
-template <typename KeyT>
+template <typename KeyT, typename BoolVectorT>
inline void CollisionFreeVectorTable
::finalizeKeyInternal(const std::size_t start_position,
const std::size_t end_position,
+ BoolVectorT *existence_map,
NativeColumnVector *output_cv) const {
- std::size_t loc = start_position - 1;
- while ((loc = existence_map_->nextOne(loc)) < end_position) {
- *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+ for (std::size_t loc = start_position; loc < end_position; ++loc) {
+ if (existence_map->get(loc)) {
+ *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+ }
}
}
-template <typename ...ArgTypes>
+template <typename AggFuncT, typename ArgT, typename BoolVectorT>
inline void CollisionFreeVectorTable
- ::finalizeStateDispatchHelper(const AggregationID agg_id,
- const Type *argument_type,
+ ::finalizeStateInternalAtomic(const std::size_t start_position,
+ const std::size_t end_position,
const void *vec_table,
- ArgTypes &&...args) const {
- switch (agg_id) {
- case AggregationID::kCount:
- finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
- std::forward<ArgTypes>(args)...);
- return;
- case AggregationID::kSum:
- finalizeStateSumHelper(argument_type,
- vec_table,
- std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <typename ...ArgTypes>
-inline void CollisionFreeVectorTable
- ::finalizeStateSumHelper(const Type *argument_type,
- const void *vec_table,
- ArgTypes &&...args) const {
- DCHECK(argument_type != nullptr);
-
- switch (argument_type->getTypeID()) {
- case TypeID::kInt: // Fall through
- case TypeID::kLong:
- finalizeStateSum<std::int64_t>(
- static_cast<const std::atomic<std::int64_t> *>(vec_table),
- std::forward<ArgTypes>(args)...);
- return;
- case TypeID::kFloat: // Fall through
- case TypeID::kDouble:
- finalizeStateSum<double>(
- static_cast<const std::atomic<double> *>(vec_table),
- std::forward<ArgTypes>(args)...);
- return;
- default:
- LOG(FATAL) << "Not supported";
+ BoolVectorT *existence_map,
+ NativeColumnVector *output_cv) const {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+ using ResultT = typename StateT::ResultT;
+
+ const auto *states = static_cast<const typename StateT::AtomicT *>(vec_table);
+
+ for (std::size_t loc = start_position; loc < end_position; ++loc) {
+ if (existence_map->get(loc)) {
+ AggFuncT::template FinalizeAtomic<ArgT>(
+ states[loc],
+ static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+ }
}
}
+template <typename AggFuncT, typename ArgT, typename BoolVectorT>
inline void CollisionFreeVectorTable
- ::finalizeStateCount(const std::atomic<std::size_t> *vec_table,
- const std::size_t start_position,
- const std::size_t end_position,
- NativeColumnVector *output_cv) const {
- std::size_t loc = start_position - 1;
- while ((loc = existence_map_->nextOne(loc)) < end_position) {
- *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
- vec_table[loc].load(std::memory_order_relaxed);
+ ::finalizeStateInternalLatch(const std::size_t start_position,
+ const std::size_t end_position,
+ const void *vec_table,
+ BoolVectorT *existence_map,
+ NativeColumnVector *output_cv) const {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+ using ResultT = typename StateT::ResultT;
+
+ const auto *states = static_cast<const typename StateT::T *>(vec_table);
+
+ for (std::size_t loc = start_position; loc < end_position; ++loc) {
+ if (existence_map->get(loc)) {
+ AggFuncT::template FinalizeUnsafe<ArgT>(
+ states[loc],
+ static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+ }
}
}
-template <typename ResultT, typename StateT>
-inline void CollisionFreeVectorTable
- ::finalizeStateSum(const std::atomic<StateT> *vec_table,
- const std::size_t start_position,
- const std::size_t end_position,
- NativeColumnVector *output_cv) const {
- std::size_t loc = start_position - 1;
- while ((loc = existence_map_->nextOne(loc)) < end_position) {
- *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
- vec_table[loc].load(std::memory_order_relaxed);
- }
-}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index bf5eaee..bd7e960 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -256,6 +256,10 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
const std::size_t extra_variable_storage,
const std::size_t retry_num) {
+ LOG(FATAL) << "Resize " << numEntries() << " + "
+ << extra_buckets << " + " << extra_variable_storage
+ << " -- " << header_->num_buckets;
+
// A retry should never be necessary with this implementation of HashTable.
// Separate chaining ensures that any resized hash table with more buckets
// than the original table will be able to hold more entries than the
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index f87a1de..9ba5500 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -95,6 +95,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
void destroyPayload() override;
+ inline std::size_t getNumFinalizationPartitions() const {
+ return CalculateNumFinalizationPartitions(numEntries());
+ }
+
/**
* @brief Use aggregation handles to update (multiple) aggregation states in
* this hash table, with group-by keys and arguments drawn from the
@@ -287,6 +291,11 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
template <typename FunctorT>
inline std::size_t forEachCompositeKey(FunctorT *functor) const;
+ template <typename FunctorT>
+ inline void forEachCompositeKeyInPartition(
+ const std::size_t partition_id,
+ const FunctorT &functor) const;
+
/**
* @brief Apply a functor to each (key, aggregation state) pair in this hash
* table, where the aggregation state is retrieved from the value
@@ -328,6 +337,25 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
return total;
}
+ inline std::size_t calculatePartitionLength() const {
+ const std::size_t num_finalize_partitions = getNumFinalizationPartitions();
+ const std::size_t partition_length =
+ (numEntries() + num_finalize_partitions - 1) / num_finalize_partitions;
+ DCHECK_GE(partition_length, 0u);
+ return partition_length;
+ }
+
+ inline std::size_t calculatePartitionStartPosition(
+ const std::size_t partition_id) const {
+ return calculatePartitionLength() * partition_id;
+ }
+
+ inline std::size_t calculatePartitionEndPosition(
+ const std::size_t partition_id) const {
+ return std::min(calculatePartitionLength() * (partition_id + 1),
+ numEntries());
+ }
+
inline bool getNextEntry(TypedValue *key,
const std::uint8_t **value,
std::size_t *entry_num) const;
@@ -438,6 +466,15 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
kBucketAlignment;
}
+ inline static std::size_t CalculateNumFinalizationPartitions(
+ const std::size_t num_entries) {
+ // Set finalization segment size as 4096 entries.
+ constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
+
+ // At least 1 partition, at most 80 partitions.
+ return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
+ }
+
// Attempt to find an empty bucket to insert 'hash_code' into, starting after
// '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
// array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
@@ -975,6 +1012,29 @@ inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
}
template <typename FunctorT>
+inline void PackedPayloadHashTable::forEachCompositeKeyInPartition(
+ const std::size_t partition_id,
+ const FunctorT &functor) const {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ std::vector<TypedValue> key;
+ for (std::size_t i = start_position; i < end_position; ++i) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + i * bucket_size_;
+ for (std::vector<const Type *>::size_type key_idx = 0;
+ key_idx < this->key_types_.size();
+ ++key_idx) {
+ key.emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx));
+ }
+ functor(key);
+ key.clear();
+ }
+}
+
+template <typename FunctorT>
inline std::size_t PackedPayloadHashTable::forEachCompositeKey(
FunctorT *functor,
const std::size_t index) const {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/utility/BoolVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BoolVector.hpp b/utility/BoolVector.hpp
new file mode 100644
index 0000000..1f16fc7
--- /dev/null
+++ b/utility/BoolVector.hpp
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_
+#define QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "threading/SpinMutex.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ * @{
+ */
+
+class BoolVector {
+ public:
+ BoolVector(void *memory_location,
+ const std::size_t length,
+ const bool initialize)
+ : owned_(false),
+ length_(length),
+ data_array_(static_cast<bool *>(memory_location)) {
+ DCHECK_GT(length, 0u);
+ DCHECK(data_array_ != nullptr);
+
+ if (initialize) {
+ clear();
+ }
+ }
+
+ explicit BoolVector(const std::size_t length)
+ : owned_(true),
+ length_(length),
+ data_array_(static_cast<bool *>(std::malloc(sizeof(bool) * length))) {
+ DCHECK_GT(length, 0u);
+ clear();
+ }
+
+ ~BoolVector() {
+ if (owned_) {
+ std::free(data_array_);
+ }
+ }
+
+ inline void clear() {
+ std::memset(data_array_, 0, sizeof(bool) * length_);
+ }
+
+ inline void set(const std::size_t loc) {
+ data_array_[loc] = true;
+ }
+
+ inline bool get(const std::size_t loc) const {
+ return data_array_[loc];
+ }
+
+ inline void unionWith(const BoolVector &other,
+ const std::size_t start_position,
+ const std::size_t end_position) const {
+ for (std::size_t loc = start_position; loc < end_position; ++loc) {
+ data_array_[loc] |= other.data_array_[loc];
+ }
+ }
+
+ inline std::size_t onesCountInRange(const std::size_t start_position,
+ const std::size_t end_position) const {
+ DCHECK_LE(start_position, end_position);
+ DCHECK_LT(start_position, length_);
+ DCHECK_LE(end_position, length_);
+
+ std::size_t count = 0;
+ for (std::size_t i = start_position; i < end_position; ++i) {
+ count += data_array_[i];
+ }
+ return count;
+ }
+
+ private:
+ const bool owned_;
+ const std::size_t length_;
+ bool *data_array_;
+
+ DISALLOW_COPY_AND_ASSIGN(BoolVector);
+};
+
+class BoolVectorPool {
+ public:
+ explicit BoolVectorPool(const std::size_t vector_length)
+ : vector_length_(vector_length) {}
+
+ BoolVector* checkOut() {
+ {
+ SpinMutexLock lock(mutex_);
+ if (!pool_.empty()) {
+ BoolVector *ret = pool_.back().release();
+ pool_.pop_back();
+ return ret;
+ }
+ }
+ return new BoolVector(vector_length_);
+ }
+
+ void checkIn(BoolVector *bool_vector) {
+ SpinMutexLock lock(mutex_);
+ pool_.emplace_back(bool_vector);
+ }
+
+ std::vector<std::unique_ptr<BoolVector>>& getAll() {
+ return pool_;
+ }
+
+ const std::vector<std::unique_ptr<BoolVector>>& getAll() const {
+ return pool_;
+ }
+
+ private:
+ const std::size_t vector_length_;
+
+ SpinMutex mutex_;
+ std::vector<std::unique_ptr<BoolVector>> pool_;
+
+ DISALLOW_COPY_AND_ASSIGN(BoolVectorPool);
+};
+
+class BarrieredReadWriteConcurrentBoolVector {
+ public:
+ BarrieredReadWriteConcurrentBoolVector(void *memory_location,
+ const std::size_t length,
+ const bool initialize)
+ : owned_(false),
+ length_(length),
+ data_array_(static_cast<DataType *>(memory_location)) {
+ DCHECK_GT(length, 0u);
+ DCHECK(data_array_ != nullptr);
+
+ if (initialize) {
+ clear();
+ }
+ }
+
+ explicit BarrieredReadWriteConcurrentBoolVector(const std::size_t length)
+ : owned_(true),
+ length_(length),
+ data_array_(static_cast<DataType *>(std::malloc(BytesNeeded(length)))) {
+ DCHECK_GT(length, 0u);
+ clear();
+ }
+
+ ~BarrieredReadWriteConcurrentBoolVector() {
+ if (owned_) {
+ std::free(data_array_);
+ }
+ }
+
+ inline static std::size_t BytesNeeded(const std::size_t length) {
+ return kDataSize * length;
+ }
+
+ inline void clear() {
+ std::memset(data_array_, 0, BytesNeeded(length_));
+ }
+
+ inline void set(const std::size_t loc) {
+ data_array_[loc].store(true, std::memory_order_relaxed);
+ }
+
+ inline bool get(const std::size_t loc) const {
+ return data_array_[loc].load(std::memory_order_relaxed);
+ }
+
+ inline std::size_t onesCountInRange(const std::size_t start_position,
+ const std::size_t end_position) const {
+ DCHECK_LE(start_position, end_position);
+ DCHECK_LT(start_position, length_);
+ DCHECK_LE(end_position, length_);
+
+ std::size_t count = 0;
+ for (std::size_t i = start_position; i < end_position; ++i) {
+ count += data_array_[i].load(std::memory_order_relaxed);
+ }
+ return count;
+ }
+
+ private:
+ typedef std::atomic<bool> DataType;
+ static constexpr std::size_t kDataSize = sizeof(DataType);
+
+ const bool owned_;
+ const std::size_t length_;
+ DataType *data_array_;
+
+ DISALLOW_COPY_AND_ASSIGN(BarrieredReadWriteConcurrentBoolVector);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ca04462..e9a978e 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -168,6 +168,7 @@ add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp)
add_library(quickstep_utility_BloomFilter_proto
${quickstep_utility_BloomFilter_proto_srcs}
${quickstep_utility_BloomFilter_proto_hdrs})
+add_library(quickstep_utility_BoolVector ../empty_src.cpp BoolVector.hpp)
add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
@@ -233,6 +234,9 @@ target_link_libraries(quickstep_utility_BloomFilter
quickstep_utility_Macros)
target_link_libraries(quickstep_utility_BloomFilter_proto
${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_utility_BoolVector
+ quickstep_threading_SpinMutex
+ quickstep_utility_Macros)
target_link_libraries(quickstep_utility_CalculateInstalledMemory
glog)
target_link_libraries(quickstep_utility_CheckSnprintf
@@ -341,6 +345,7 @@ target_link_libraries(quickstep_utility
quickstep_utility_BitVector
quickstep_utility_BloomFilter
quickstep_utility_BloomFilter_proto
+ quickstep_utility_BoolVector
quickstep_utility_CalculateInstalledMemory
quickstep_utility_Cast
quickstep_utility_CheckSnprintf