You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/28 19:34:21 UTC
[08/11] incubator-quickstep git commit: Support for performing
partitioned aggregation.
Support for performing partitioned aggregation.
- Used for creating a pool of hash tables such that each hash table
belongs to a unique partition.
- The partitioning is done on the group-by keys.
- Wrote a utility function to compute composite hash of a group of
TypedValues.
- Added a check for whether the aggregation is partitioned or not.
- The conditions for whether the aggregation can be partitioned
are as follows:
1. The query has a GROUP BY clause.
2. There are no aggrgeations with a DISTINCT clause.
3. The estimated number of groups are greater than a pre-defined
threshold.
4. The query has at least one aggregation function.
- Method for partitioned aggregation with GROUP BY
- StorageBlock now provides a method for performing GROUP BY aggregation
in a partitioned way.
- The Tuple class now supports a method to compute the hash of the entire
tuple (i.e. hash key is the composite key made up of all the
attributes in the tuple).
- AggregationOperationState calls appropriate method (i.e.
aggregateGroupBy or aggregateGroupByPartitioned) based on the way in
which aggregation is being performed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1e1434ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1e1434ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1e1434ad
Branch: refs/heads/exact-filter
Commit: 1e1434ad41db59009b4f30578f3832ff62799b35
Parents: 393eba5
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Oct 26 09:09:52 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Oct 26 09:10:09 2016 -0500
----------------------------------------------------------------------
.../FinalizeAggregationOperator.cpp | 30 ++-
.../FinalizeAggregationOperator.hpp | 9 +-
storage/AggregationOperationState.cpp | 195 +++++++++++++++---
storage/AggregationOperationState.hpp | 74 ++++++-
storage/CMakeLists.txt | 12 ++
storage/HashTablePool.hpp | 2 +-
storage/PartitionedHashTablePool.hpp | 204 +++++++++++++++++++
storage/StorageBlock.cpp | 55 +++++
storage/StorageBlock.hpp | 50 +++++
types/containers/CMakeLists.txt | 1 +
types/containers/Tuple.hpp | 8 +
utility/CMakeLists.txt | 6 +
utility/CompositeHash.hpp | 52 +++++
13 files changed, 657 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 7e337de..0cbf635 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,16 +41,27 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
if (blocking_dependencies_met_ && !started_) {
started_ = true;
- container->addNormalWorkOrder(
- new FinalizeAggregationWorkOrder(
- query_id_,
- query_context->getAggregationState(aggr_state_index_),
- query_context->getInsertDestination(output_destination_index_)),
- op_index_);
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_);
+ DCHECK(agg_state != nullptr);
+ for (int part_id = 0;
+ part_id < static_cast<int>(agg_state->getNumPartitions());
+ ++part_id) {
+ container->addNormalWorkOrder(
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ agg_state,
+ query_context->getInsertDestination(output_destination_index_),
+ part_id),
+ op_index_);
+ }
}
return started_;
}
+// TODO(quickstep-team) : Think about how the number of partitions could be
+// accessed in this function. Until then, we can't use partitioned aggregation
+// with the distributed version.
bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (blocking_dependencies_met_ && !started_) {
started_ = true;
@@ -68,9 +79,12 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
return started_;
}
-
void FinalizeAggregationWorkOrder::execute() {
- state_->finalizeAggregate(output_destination_);
+ if (state_->isAggregatePartitioned()) {
+ state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+ } else {
+ state_->finalizeAggregate(output_destination_);
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 0aeac2a..ae7127a 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -119,13 +119,17 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
+ * @param part_id The partition ID for which the Finalize aggregation work
+ * order is issued. Ignore if aggregation is not partitioned.
*/
FinalizeAggregationWorkOrder(const std::size_t query_id,
AggregationOperationState *state,
- InsertDestination *output_destination)
+ InsertDestination *output_destination,
+ const int part_id = -1)
: WorkOrder(query_id),
state_(DCHECK_NOTNULL(state)),
- output_destination_(DCHECK_NOTNULL(output_destination)) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ part_id_(part_id) {}
~FinalizeAggregationWorkOrder() override {}
@@ -134,6 +138,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
private:
AggregationOperationState *state_;
InsertDestination *output_destination_;
+ const int part_id_;
DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index eb7ca79..b942c1b 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -60,6 +60,14 @@ using std::unique_ptr;
namespace quickstep {
+DEFINE_int32(num_aggregation_partitions,
+ 41,
+ "The number of partitions used for performing the aggregation");
+DEFINE_int32(partition_aggregation_num_groups_threshold,
+ 500000,
+ "The threshold used for deciding whether the aggregation is done "
+ "in a partitioned way or not");
+
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction *> &aggregate_functions,
@@ -72,6 +80,8 @@ AggregationOperationState::AggregationOperationState(
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
+ is_aggregate_partitioned_(checkAggregatePartitioned(
+ estimated_num_entries, is_distinct, group_by, aggregate_functions)),
predicate_(predicate),
group_by_list_(std::move(group_by)),
arguments_(std::move(arguments)),
@@ -175,11 +185,10 @@ AggregationOperationState::AggregationOperationState(
key_types.insert(
key_types.end(), argument_types.begin(), argument_types.end());
// TODO(jianqiao): estimated_num_entries is quite inaccurate for
- // estimating
- // the number of entries in the distinctify hash table. We may estimate
- // for each distinct aggregation an estimated_num_distinct_keys value
- // during
- // query optimization, if it worths.
+ // estimating the number of entries in the distinctify hash table.
+ // We may estimate for each distinct aggregation an
+ // estimated_num_distinct_keys value during query optimization, if it's
+ // worth.
distinctify_hashtables_.emplace_back(
AggregationStateFastHashTableFactory::CreateResizable(
*distinctify_hash_table_impl_types_it,
@@ -195,14 +204,24 @@ AggregationOperationState::AggregationOperationState(
}
if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool for per-group
- // states.
- group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager));
+ // Aggregation with GROUP BY: create a HashTable pool.
+ if (!is_aggregate_partitioned_) {
+ group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ payload_sizes,
+ group_by_handles,
+ storage_manager));
+ } else {
+ partitioned_group_by_hashtable_pool_.reset(
+ new PartitionedHashTablePool(estimated_num_entries,
+ FLAGS_num_aggregation_partitions,
+ hash_table_impl_type,
+ group_by_types,
+ payload_sizes,
+ group_by_handles,
+ storage_manager));
+ }
}
}
}
@@ -450,19 +469,71 @@ void AggregationOperationState::aggregateBlockHashTable(
}
}
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- DCHECK(group_by_hashtable_pool_ != nullptr);
- AggregationStateHashTableBase *agg_hash_table =
+ if (!is_aggregate_partitioned_) {
+ // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+ // directly into the (threadsafe) shared global HashTable for this
+ // aggregate.
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+ AggregationStateHashTableBase *agg_hash_table =
group_by_hashtable_pool_->getHashTableFast();
- DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupBy(arguments_,
- group_by_list_,
- matches.get(),
- agg_hash_table,
- &reuse_group_by_vectors);
- group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+ DCHECK(agg_hash_table != nullptr);
+ block->aggregateGroupBy(arguments_,
+ group_by_list_,
+ matches.get(),
+ agg_hash_table,
+ &reuse_group_by_vectors);
+ group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+ } else {
+ ColumnVectorsValueAccessor temp_result;
+ // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+ std::vector<attribute_id> argument_ids;
+
+ // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+ std::vector<attribute_id> key_ids;
+ const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
+ block->aggregateGroupByPartitioned(
+ arguments_,
+ group_by_list_,
+ matches.get(),
+ num_partitions,
+ &temp_result,
+ &argument_ids,
+ &key_ids,
+ &reuse_group_by_vectors);
+ // Compute the partitions for the tuple formed by group by values.
+ std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+ partition_membership.resize(num_partitions);
+
+ // Create a tuple-id sequence for each partition.
+ for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+ partition_membership[partition].reset(
+ new TupleIdSequence(temp_result.getEndPosition()));
+ }
+
+ // Iterate over ValueAccessor for each tuple,
+ // set a bit in the appropriate TupleIdSequence.
+ temp_result.beginIteration();
+ while (temp_result.next()) {
+ // We need a unique_ptr because getTupleWithAttributes() uses "new".
+ std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+ const std::size_t curr_tuple_partition_id =
+ curr_tuple->getTupleHash() % num_partitions;
+ partition_membership[curr_tuple_partition_id]->set(
+ temp_result.getCurrentPosition(), true);
+ }
+ // For each partition, create an adapter around Value Accessor and
+ // TupleIdSequence.
+ std::vector<std::unique_ptr<
+ TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+ adapter.resize(num_partitions);
+ for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+ adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+ *(partition_membership)[partition]));
+ partitioned_group_by_hashtable_pool_->getHashTable(partition)
+ ->upsertValueAccessorCompositeKeyFast(
+ argument_ids, adapter[partition].get(), key_ids, true);
+ }
+ }
}
void AggregationOperationState::finalizeSingleState(
@@ -606,13 +677,81 @@ void AggregationOperationState::finalizeHashTable(
}
void AggregationOperationState::destroyAggregationHashTablePayload() {
- if (group_by_hashtable_pool_ != nullptr) {
- auto all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(all_hash_tables != nullptr);
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
+ nullptr;
+ if (!is_aggregate_partitioned_) {
+ if (group_by_hashtable_pool_ != nullptr) {
+ all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ }
+ } else {
+ if (partitioned_group_by_hashtable_pool_ != nullptr) {
+ all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
+ }
+ }
+ if (all_hash_tables != nullptr) {
for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
(*all_hash_tables)[ht_index]->destroyPayload();
}
}
}
+void AggregationOperationState::finalizeAggregatePartitioned(
+ const std::size_t partition_id, InsertDestination *output_destination) {
+ // Each element of 'group_by_keys' is a vector of values for a particular
+ // group (which is also the prefix of the finalized Tuple for that group).
+ std::vector<std::vector<TypedValue>> group_by_keys;
+
+ // Collect per-aggregate finalized values.
+ std::vector<std::unique_ptr<ColumnVector>> final_values;
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ AggregationStateHashTableBase *hash_table =
+ partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
+ ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+ *hash_table, &group_by_keys, agg_idx);
+ if (agg_result_col != nullptr) {
+ final_values.emplace_back(agg_result_col);
+ }
+ }
+
+ // Reorganize 'group_by_keys' in column-major order so that we can make a
+ // ColumnVectorsValueAccessor to bulk-insert results.
+ //
+ // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
+ // if there is only one aggregate. The need to do this should hopefully go
+ // away when we work out storing composite structures for multiple aggregates
+ // in a single HashTable.
+ std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
+ std::size_t group_by_element_idx = 0;
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
+ const Type &group_by_type = group_by_element->getType();
+ if (NativeColumnVector::UsableForType(group_by_type)) {
+ NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ group_by_cvs.emplace_back(element_cv);
+ for (std::vector<TypedValue> &group_key : group_by_keys) {
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ }
+ } else {
+ IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ group_by_cvs.emplace_back(element_cv);
+ for (std::vector<TypedValue> &group_key : group_by_keys) {
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ }
+ }
+ ++group_by_element_idx;
+ }
+
+ // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
+ // and the finalized aggregates.
+ ColumnVectorsValueAccessor complete_result;
+ for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
+ complete_result.addColumn(group_by_cv.release());
+ }
+ for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
+ complete_result.addColumn(final_value_cv.release());
+ }
+
+ // Bulk-insert the complete result.
+ output_destination->bulkInsertTuples(&complete_result);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c251983..e0826b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -32,9 +32,12 @@
#include "storage/AggregationOperationState.pb.h"
#include "storage/HashTableBase.hpp"
#include "storage/HashTablePool.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "gflags/gflags.h"
+
namespace quickstep {
class AggregateFunction;
@@ -44,6 +47,9 @@ class InsertDestination;
class LIPFilterAdaptiveProber;
class StorageManager;
+DECLARE_int32(num_aggregation_partitions);
+DECLARE_int32(partition_aggregation_num_groups_threshold);
+
/** \addtogroup Storage
* @{
*/
@@ -176,9 +182,39 @@ class AggregationOperationState {
**/
void destroyAggregationHashTablePayload();
+ /**
+ * @brief Generate the final results for the aggregates managed by this
+ * AggregationOperationState and write them out to StorageBlock(s).
+ * In this implementation, each thread picks a hash table belonging to
+ * a partition and writes its values to StorageBlock(s). There is no
+ * need to merge multiple hash tables in one, because there is no
+ * overlap in the keys across two hash tables.
+ *
+ * @param partition_id The ID of the partition for which finalize is being
+ * performed.
+ * @param output_destination An InsertDestination where the finalized output
+ * tuple(s) from this aggregate are to be written.
+ **/
+ void finalizeAggregatePartitioned(
+ const std::size_t partition_id, InsertDestination *output_destination);
+
static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
AggregationStateHashTableBase *dst);
+ bool isAggregatePartitioned() const {
+ return is_aggregate_partitioned_;
+ }
+
+ /**
+ * @brief Get the number of partitions to be used for the aggregation.
+ * For non-partitioned aggregations, we return 1.
+ **/
+ std::size_t getNumPartitions() const {
+ return is_aggregate_partitioned_
+ ? partitioned_group_by_hashtable_pool_->getNumPartitions()
+ : 1;
+ }
+
int dflag;
private:
@@ -195,12 +231,41 @@ class AggregationOperationState {
void finalizeSingleState(InsertDestination *output_destination);
void finalizeHashTable(InsertDestination *output_destination);
- // A vector of group by hash table pools.
- std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
+ bool checkAggregatePartitioned(
+ const std::size_t estimated_num_groups,
+ const std::vector<bool> &is_distinct,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const std::vector<const AggregateFunction *> &aggregate_functions) const {
+ // If there's no aggregation, return false.
+ if (aggregate_functions.empty()) {
+ return false;
+ }
+ // Check if there's a distinct operation involved in any aggregate, if so
+ // the aggregate can't be partitioned.
+ for (auto distinct : is_distinct) {
+ if (distinct) {
+ return false;
+ }
+ }
+ // There's no distinct aggregation involved, Check if there's at least one
+ // GROUP BY operation.
+ if (group_by.empty()) {
+ return false;
+ }
+ // There are GROUP BYs without DISTINCT. Check if the estimated number of
+ // groups is large enough to warrant a partitioned aggregation.
+ return estimated_num_groups >
+ static_cast<std::size_t>(
+ FLAGS_partition_aggregation_num_groups_threshold);
+ }
// Common state for all aggregates in this operation: the input relation, the
// filter predicate (if any), and the list of GROUP BY expressions (if any).
const CatalogRelationSchema &input_relation_;
+
+ // Whether the aggregation is partitioned or not.
+ const bool is_aggregate_partitioned_;
+
std::unique_ptr<const Predicate> predicate_;
std::vector<std::unique_ptr<const Scalar>> group_by_list_;
@@ -233,6 +298,11 @@ class AggregationOperationState {
std::vector<std::unique_ptr<AggregationStateHashTableBase>>
group_by_hashtables_;
+ // A vector of group by hash table pools.
+ std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
+
+ std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 0e32cc1..be60662 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -235,6 +235,7 @@ add_library(quickstep_storage_PackedRowStoreTupleStorageSubBlock
add_library(quickstep_storage_PackedRowStoreValueAccessor
../empty_src.cpp
PackedRowStoreValueAccessor.hpp)
+add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
add_library(quickstep_storage_SeparateChainingHashTable ../empty_src.cpp SeparateChainingHashTable.hpp)
@@ -270,6 +271,7 @@ add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_Wi
# Link dependencies:
target_link_libraries(quickstep_storage_AggregationOperationState
+ ${GFLAGS_LIB_NAME}
glog
quickstep_catalog_CatalogDatabaseLite
quickstep_catalog_CatalogRelationSchema
@@ -289,6 +291,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_storage_HashTableFactory
quickstep_storage_HashTablePool
quickstep_storage_InsertDestination
+ quickstep_storage_PartitionedHashTablePool
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
@@ -852,6 +855,14 @@ target_link_libraries(quickstep_storage_PackedRowStoreValueAccessor
quickstep_types_TypedValue
quickstep_utility_BitVector
quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_PartitionedHashTablePool
+ glog
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_storage_FastHashTable
+ quickstep_storage_FastHashTableFactory
+ quickstep_storage_HashTableBase
+ quickstep_utility_Macros
+ quickstep_utility_StringUtil)
target_link_libraries(quickstep_storage_PreloaderThread
glog
quickstep_catalog_CatalogDatabase
@@ -1169,6 +1180,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_LinearOpenAddressingHashTable
quickstep_storage_PackedRowStoreTupleStorageSubBlock
quickstep_storage_PackedRowStoreValueAccessor
+ quickstep_storage_PartitionedHashTablePool
quickstep_storage_PreloaderThread
quickstep_storage_SMAIndexSubBlock
quickstep_storage_SeparateChainingHashTable
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 3cdfcb3..96cf849 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -173,7 +173,7 @@ class HashTablePool {
* @param All the hash tables in the pool.
*
**/
- const std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
getAllHashTables() {
return &hash_tables_;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
new file mode 100644
index 0000000..bcc4d23
--- /dev/null
+++ b/storage/PartitionedHashTablePool.hpp
@@ -0,0 +1,204 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+#define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+
+#include <algorithm>
+#include <chrono>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
+#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ * @{
+ */
+
+/**
+ * @brief A pool of HashTables used for a single aggregation handle. Each
+ * HashTable represents values from a given partition, which is
+ * determined by the keys in the group by clause.
+ **/
+class PartitionedHashTablePool {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param estimated_num_entries The maximum number of entries in a hash table.
+ * @param num_partitions The number of partitions (i.e. number of HashTables)
+ * @param hash_table_impl_type The type of hash table implementation.
+ * @param group_by_types A vector of pointer of types which form the group by
+ * key.
+ * @param agg_handle The aggregation handle.
+ * @param storage_manager A pointer to the storage manager.
+ **/
+ PartitionedHashTablePool(const std::size_t estimated_num_entries,
+ const std::size_t num_partitions,
+ const HashTableImplType hash_table_impl_type,
+ const std::vector<const Type *> &group_by_types,
+ AggregationHandle *agg_handle,
+ StorageManager *storage_manager)
+ : estimated_num_entries_(
+ setHashTableSize(estimated_num_entries, num_partitions)),
+ num_partitions_(num_partitions),
+ hash_table_impl_type_(hash_table_impl_type),
+ group_by_types_(group_by_types),
+ agg_handle_(DCHECK_NOTNULL(agg_handle)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ initializeAllHashTables();
+ }
+
+ /**
+ * @brief Constructor.
+ *
+ * @note This constructor is relevant for the HashTable specialized for
+ * aggregation.
+ *
+ * @param estimated_num_entries The maximum number of entries in a hash table.
+ * @param num_partitions The number of partitions (i.e. number of HashTables)
+ * @param hash_table_impl_type The type of hash table implementation.
+ * @param group_by_types A vector of pointer of types which form the group by
+ * key.
+ * @param payload_sizes The sizes of the payload elements (i.e.
+ * AggregationStates).
+ * @param handles The aggregation handles.
+ * @param storage_manager A pointer to the storage manager.
+ **/
+ PartitionedHashTablePool(const std::size_t estimated_num_entries,
+ const std::size_t num_partitions,
+ const HashTableImplType hash_table_impl_type,
+ const std::vector<const Type *> &group_by_types,
+ const std::vector<std::size_t> &payload_sizes,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : estimated_num_entries_(
+ setHashTableSize(estimated_num_entries, num_partitions)),
+ num_partitions_(num_partitions),
+ hash_table_impl_type_(hash_table_impl_type),
+ group_by_types_(group_by_types),
+ payload_sizes_(payload_sizes),
+ handles_(handles),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ initializeAllHashTables();
+ }
+
+ /**
+ * @brief Check out a hash table for insertion.
+ *
+ * @param partition_id The ID of the partitioned HashTable.
+ *
+ * @return A hash table pointer for the given HashTable.
+ **/
+ AggregationStateHashTableBase* getHashTable(const std::size_t partition_id) {
+ DCHECK_LT(partition_id, num_partitions_);
+ DCHECK_LT(partition_id, hash_tables_.size());
+ return hash_tables_[partition_id].get();
+ }
+
+ /**
+ * @brief Get all the hash tables from the pool.
+ *
+ * @warning The caller should ensure that this call is made when no hash table
+ * is being checked in or checked out from the pool. In other words
+ * the hash table pool is in read-only state.
+ *
+ * @param All the hash tables in the pool.
+ *
+ **/
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+ getAllHashTables() {
+ return &hash_tables_;
+ }
+
+ /**
+ * @brief Get the number of partitions used for the aggregation.
+ **/
+ inline std::size_t getNumPartitions() const {
+ return num_partitions_;
+ }
+
+ private:
+ void initializeAllHashTables() {
+ for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
+ AggregationStateHashTableBase *part_hash_table = createNewHashTableFast();
+ hash_tables_.push_back(
+ std::unique_ptr<AggregationStateHashTableBase>(part_hash_table));
+ }
+ }
+
+ AggregationStateHashTableBase* createNewHashTable() {
+ return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
+ group_by_types_,
+ estimated_num_entries_,
+ storage_manager_);
+ }
+
+ AggregationStateHashTableBase* createNewHashTableFast() {
+ return AggregationStateFastHashTableFactory::CreateResizable(
+ hash_table_impl_type_,
+ group_by_types_,
+ estimated_num_entries_,
+ payload_sizes_,
+ handles_,
+ storage_manager_);
+ }
+
+ inline std::size_t setHashTableSize(const std::size_t overall_estimate,
+ const std::size_t num_partitions) const {
+ CHECK_NE(num_partitions, 0Lu);
+ // The minimum size of the hash table is set to 100.
+ return std::max(static_cast<std::size_t>(overall_estimate / num_partitions),
+ 100Lu);
+ }
+
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
+
+ const std::size_t estimated_num_entries_;
+ const std::size_t num_partitions_;
+
+ const HashTableImplType hash_table_impl_type_;
+
+ const std::vector<const Type *> group_by_types_;
+
+ std::vector<std::size_t> payload_sizes_;
+
+ AggregationHandle *agg_handle_;
+ const std::vector<AggregationHandle *> handles_;
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(PartitionedHashTablePool);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index dd3e19d..ea74ee6 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -1329,4 +1329,59 @@ const std::size_t StorageBlock::getNumTuples() const {
return tuple_store_->numTuples();
}
+void StorageBlock::aggregateGroupByPartitioned(
+ const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const TupleIdSequence *filter,
+ const std::size_t num_partitions,
+ ColumnVectorsValueAccessor *temp_result,
+ std::vector<attribute_id> *argument_ids,
+ std::vector<attribute_id> *key_ids,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
+ DCHECK(!group_by.empty())
+ << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
+
+ SubBlocksReference sub_blocks_ref(*tuple_store_,
+ indices_,
+ indices_consistent_);
+
+ std::unique_ptr<ValueAccessor> accessor(
+ tuple_store_->createValueAccessor(filter));
+
+ attribute_id attr_id = 0;
+
+ // First, put GROUP BY keys into 'temp_result'.
+ if (reuse_group_by_vectors->empty()) {
+ // Compute GROUP BY values from group_by Scalars, and store them in
+ // reuse_group_by_vectors for reuse by other aggregates on this same
+ // block.
+ reuse_group_by_vectors->reserve(group_by.size());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ reuse_group_by_vectors->emplace_back(
+ group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+ temp_result->addColumn(reuse_group_by_vectors->back().get(), false);
+ key_ids->push_back(attr_id++);
+ }
+ } else {
+ // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+ DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+ << "Wrong number of reuse_group_by_vectors";
+ for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+ temp_result->addColumn(reuse_cv.get(), false);
+ key_ids->push_back(attr_id++);
+ }
+ }
+
+ // Compute argument vectors and add them to 'temp_result'.
+ for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
+ for (const std::unique_ptr<const Scalar> &args : argument) {
+ temp_result->addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+ argument_ids->push_back(attr_id++);
+ }
+ if (argument.empty()) {
+ argument_ids->push_back(kInvalidAttributeID);
+ }
+ }
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 488efcc..56b3bdc 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -43,6 +43,7 @@ class AggregationHandle;
class AggregationState;
class CatalogRelationSchema;
class ColumnVector;
+class ColumnVectorsValueAccessor;
class InsertDestinationInterface;
class Predicate;
class Scalar;
@@ -451,6 +452,55 @@ class StorageBlock : public StorageBlockBase {
AggregationStateHashTableBase *hash_table,
std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
+
+ /**
+ * @brief Perform the GROUP BY aggregation for the case when aggregation is
+ * partitioned.
+ *
+ * TODO(harshad) - Refactor this class to use only one function
+ * aggregateGroupBy.
+ * @note The difference between this method and the aggregateGroupBy method
+ * is that in this method, the tuples are routed to different HashTables
+ * based on the partition to which they belong to. The partition is
+ * determined by the GROUP BY attributes. Right now hash based
+ * partitioning is performed.
+ *
+ * @note This function only creates the ColumnVectorsValueAccessor needed for
+ * the insertion in the hash table. The actual insertion in respective
+ * hash tables should be handled by the caller. See
+ * AggregationOperationState::aggregateHashTable() for one such
+ * implementation.
+ *
+ * @param arguments The arguments to the aggregation function as Scalars.
+ * @param group_by The list of GROUP BY attributes/expressions. The tuples in
+ * this storage block are grouped by these attributes before
+ * aggregation.
+ * @param filter If non-NULL, then only tuple IDs which are set in the
+ * filter will be checked (all others will be assumed to be false).
+ * @param num_partitions The number of partitions used for the aggregation.
+ * @param temp_result The ColumnVectorsValueAccessor used for collecting
+ * the attribute values from this StorageBlock.
+ * @param arguments_ids The attribute IDs used for the aggregation, which
+ * come from the arguments vector. If arguments is empty, this vector
+ * is filled with invalid attribute IDs.
+ * @param key_ids The attribute IDs of the group by attributes.
+ * @param reuse_group_by_vectors This parameter is used to store and reuse
+ * GROUP BY attribute vectors pre-computed in an earlier invocation of
+ * aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
+ * for ease of use. Current invocation of aggregateGroupBy() will reuse
+ * ColumnVectors if non-empty, otherwise computes ColumnVectors based
+ * on \c group_by and stores them in \c reuse_group_by_vectors.
+ **/
+ void aggregateGroupByPartitioned(
+ const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const TupleIdSequence *filter,
+ const std::size_t num_partitions,
+ ColumnVectorsValueAccessor *temp_result,
+ std::vector<attribute_id> *argument_ids,
+ std::vector<attribute_id> *key_ids,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
+
/**
* @brief Inserts the GROUP BY expressions and aggregation arguments together
* as keys into the distinctify hash table.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/types/containers/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index aacb63a..c2a6623 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple
quickstep_catalog_CatalogTypedefs
quickstep_types_TypedValue
quickstep_types_containers_Tuple_proto
+ quickstep_utility_CompositeHash
quickstep_utility_Macros)
target_link_libraries(quickstep_types_containers_Tuple_proto
quickstep_types_TypedValue_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/types/containers/Tuple.hpp
----------------------------------------------------------------------
diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp
index 60f832c..6237d54 100644
--- a/types/containers/Tuple.hpp
+++ b/types/containers/Tuple.hpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/Tuple.pb.h"
+#include "utility/CompositeHash.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -218,6 +219,13 @@ class Tuple {
return attribute_values_.size();
}
+ /**
+ * @brief Get the hash value of the tuple.
+ **/
+ std::size_t getTupleHash() const {
+ return HashCompositeKey(attribute_values_);
+ }
+
private:
/**
* @brief Constructor which does not create any attributes, nor pre-reserve
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 395e264..e9be2ec 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -169,6 +169,7 @@ add_library(quickstep_utility_BloomFilter_proto
add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
@@ -230,6 +231,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory
glog)
target_link_libraries(quickstep_utility_CheckSnprintf
glog)
+target_link_libraries(quickstep_utility_CompositeHash
+ quickstep_types_TypedValue
+ quickstep_utility_HashPair
+ glog)
target_link_libraries(quickstep_utility_DAG
glog
quickstep_utility_Macros)
@@ -325,6 +330,7 @@ target_link_libraries(quickstep_utility
quickstep_utility_CalculateInstalledMemory
quickstep_utility_Cast
quickstep_utility_CheckSnprintf
+ quickstep_utility_CompositeHash
quickstep_utility_DAG
quickstep_utility_DisjointTreeForest
quickstep_utility_EqualsAnyConstant
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/utility/CompositeHash.hpp
----------------------------------------------------------------------
diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp
new file mode 100644
index 0000000..517bc96
--- /dev/null
+++ b/utility/CompositeHash.hpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "types/TypedValue.hpp"
+#include "utility/HashPair.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief Compute the hash value of a composite key.
+ *
+ * @param key A vector of TypedValues which together form the composite key.
+ * @return The hash value.
+ **/
+static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) {
+ DCHECK(!key.empty());
+ std::size_t hash = key.front().getHash();
+ for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+ key_it != key.end();
+ ++key_it) {
+ hash = CombineHashes(hash, key_it->getHash());
+ }
+ return hash;
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_