You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/06 20:16:42 UTC
[65/73] [abbrv] incubator-quickstep git commit: Added function to
aggregate group by with partition
Added function to aggregate group by with partition
- Added a function to compute a composite hash function for multiple
attributes.
- Changes to Tuple class to support partitioning.
- A function in StorageBlock to compute aggregate with group by that
supports partitioning.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b22323e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b22323e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b22323e7
Branch: refs/heads/partitioned-aggregation
Commit: b22323e7be97055f4e98c061731005420aca4ea6
Parents: 6fa0e29
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Aug 18 12:30:18 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Sep 6 14:56:12 2016 -0500
----------------------------------------------------------------------
storage/AggregationOperationState.cpp | 69 ++++++++++++------
storage/AggregationOperationState.hpp | 7 ++
storage/PartitionedHashTablePool.hpp | 4 +
storage/StorageBlock.cpp | 113 +++++++++++++++++++++++++++++
storage/StorageBlock.hpp | 10 +++
types/containers/CMakeLists.txt | 1 +
types/containers/Tuple.hpp | 8 ++
utility/CMakeLists.txt | 6 ++
utility/CompositeHash.hpp | 52 +++++++++++++
9 files changed, 246 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index c5f59f9..c39e98a 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -68,7 +68,8 @@ AggregationOperationState::AggregationOperationState(
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
- : input_relation_(input_relation),
+ : is_aggregate_partitioned_(estimated_num_entries > kPartitionedAggregateThreshold),
+ input_relation_(input_relation),
predicate_(predicate),
group_by_list_(std::move(group_by)),
arguments_(std::move(arguments)),
@@ -194,15 +195,26 @@ AggregationOperationState::AggregationOperationState(
}
if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool for per-group
- // states.
- group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
- new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager)));
+ // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+ if (!is_aggregate_partitioned_) {
+ group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ payload_sizes,
+ group_by_handles,
+ storage_manager)));
+ }
+ else {
+ partitioned_group_by_hashtable_pool_.reset(
+ new PartitionedHashTablePool(estimated_num_entries,
+ kNumPartitionsForAggregate,
+ hash_table_impl_type,
+ group_by_types,
+ payload_sizes,
+ group_by_handles,
+ storage_manager));
+ }
}
}
}
@@ -441,20 +453,29 @@ void AggregationOperationState::aggregateBlockHashTable(
}
}
- // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
- // directly into the (threadsafe) shared global HashTable for this
- // aggregate.
- DCHECK(group_by_hashtable_pools_[0] != nullptr);
- AggregationStateHashTableBase *agg_hash_table =
- group_by_hashtable_pools_[0]->getHashTableFast();
- DCHECK(agg_hash_table != nullptr);
- block->aggregateGroupByFast(arguments_,
- group_by_list_,
- predicate_.get(),
- agg_hash_table,
- &reuse_matches,
- &reuse_group_by_vectors);
- group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
+ if (!is_aggregate_partitioned_) {
+ // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+ // directly into the (threadsafe) shared global HashTable for this
+ // aggregate.
+ DCHECK(group_by_hashtable_pools_[0] != nullptr);
+ AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
+ DCHECK(agg_hash_table != nullptr);
+ block->aggregateGroupByFast(arguments_,
+ group_by_list_,
+ predicate_.get(),
+ agg_hash_table,
+ &reuse_matches,
+ &reuse_group_by_vectors);
+ group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
+ } else {
+ block->aggregateGroupByPartitioned(
+ arguments_,
+ group_by_list_,
+ predicate_.get(),
+ &reuse_matches,
+ &reuse_group_by_vectors,
+ partitioned_group_by_hashtable_pool_.get());
+ }
}
void AggregationOperationState::finalizeSingleState(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 66af517..7e8acb5 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -174,6 +174,11 @@ class AggregationOperationState {
int dflag;
private:
+ static constexpr std::size_t kPartitionedAggregateThreshold = 100;
+ static constexpr std::size_t kNumPartitionsForAggregate = 40;
+
+ const bool is_aggregate_partitioned_;
+
// Merge locally (per storage block) aggregated states with global aggregation
// states.
void mergeSingleState(
@@ -225,6 +230,8 @@ class AggregationOperationState {
// A vector of group by hash table pools, one for each group by clause.
std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
+ std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
index a71af44..7f58fa9 100644
--- a/storage/PartitionedHashTablePool.hpp
+++ b/storage/PartitionedHashTablePool.hpp
@@ -143,6 +143,10 @@ class PartitionedHashTablePool {
return &hash_tables_;
}
+ inline std::size_t getNumPartitions() const {
+ return num_partitions_;
+ }
+
private:
void initializeAllHashTables() {
for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 8ff18b5..06daff6 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -41,6 +41,7 @@
#include "storage/IndexSubBlock.hpp"
#include "storage/InsertDestinationInterface.hpp"
#include "storage/PackedRowStoreTupleStorageSubBlock.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
#include "storage/SMAIndexSubBlock.hpp"
#include "storage/SplitRowStoreTupleStorageSubBlock.hpp"
#include "storage/StorageBlockBase.hpp"
@@ -1450,4 +1451,116 @@ const std::size_t StorageBlock::getNumTuples() const {
return tuple_store_->numTuples();
}
+void StorageBlock::aggregateGroupByPartitioned(
+ const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const Predicate *predicate,
+ std::unique_ptr<TupleIdSequence> *reuse_matches,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+ PartitionedHashTablePool *hashtable_pool) const {
+ DCHECK_EQ(group_by.size(), 0u)
+ << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
+
+ SubBlocksReference sub_blocks_ref(*tuple_store_,
+ indices_,
+ indices_consistent_);
+
+ // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+ std::vector<attribute_id> arg_ids;
+ std::vector<std::vector<attribute_id>> argument_ids;
+
+ // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+ std::vector<attribute_id> key_ids;
+
+ // An intermediate ValueAccessor that stores the materialized 'arguments' for
+ // this aggregate, as well as the GROUP BY expression values.
+ ColumnVectorsValueAccessor temp_result;
+ std::unique_ptr<ValueAccessor> accessor;
+ if (predicate) {
+ if (!*reuse_matches) {
+ // If there is a filter predicate that hasn't already been evaluated,
+ // evaluate it now and save the results for other aggregates on this
+ // same block.
+ reuse_matches->reset(getMatchesForPredicate(predicate));
+ }
+
+ // Create a filtered ValueAccessor that only iterates over predicate
+ // matches.
+ accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+ } else {
+ // Create a ValueAccessor that iterates over all tuples in this block
+ accessor.reset(tuple_store_->createValueAccessor());
+ }
+
+ attribute_id attr_id = 0;
+
+ // First, put GROUP BY keys into 'temp_result'.
+ if (reuse_group_by_vectors->empty()) {
+ // Compute GROUP BY values from group_by Scalars, and store them in
+ // reuse_group_by_vectors for reuse by other aggregates on this same
+ // block.
+ reuse_group_by_vectors->reserve(group_by.size());
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+ reuse_group_by_vectors->emplace_back(
+ group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+ temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+ key_ids.push_back(attr_id++);
+ }
+ } else {
+ // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+ DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+ << "Wrong number of reuse_group_by_vectors";
+ for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+ temp_result.addColumn(reuse_cv.get(), false);
+ key_ids.push_back(attr_id++);
+ }
+ }
+
+ // Compute argument vectors and add them to 'temp_result'.
+ for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
+ arg_ids.clear();
+ for (const std::unique_ptr<const Scalar> &args : argument) {
+ temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+ arg_ids.push_back(attr_id++);
+ }
+ argument_ids.push_back(arg_ids);
+ }
+
+ // Compute the partitions for the tuple formed by group by values.
+ std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+ partition_membership.resize(hashtable_pool->getNumPartitions());
+
+ // Create a tuple-id sequence for each partition.
+ for (std::size_t partition = 0;
+ partition < hashtable_pool->getNumPartitions();
+ ++partition) {
+ partition_membership[partition].reset(new TupleIdSequence(temp_result.getEndPosition()));
+ }
+
+ // Iterate over ValueAccessor for each tuple,
+ // set a bit in the appropriate TupleIdSequence.
+ temp_result.beginIteration();
+ while (temp_result.next()) {
+ const std::size_t curr_tuple_partition_id =
+ temp_result.getTupleWithAttributes(key_ids)->getTupleHash() %
+ hashtable_pool->getNumPartitions();
+ partition_membership[curr_tuple_partition_id]->set(
+ temp_result.getCurrentPosition(), true);
+ }
+ // For each partition, create an adapter around Value Accessor and
+ // TupleIdSequence.
+ std::vector<std::unique_ptr<
+ TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+ adapter.resize(hashtable_pool->getNumPartitions());
+ for (std::size_t partition = 0;
+ partition < hashtable_pool->getNumPartitions();
+ ++partition) {
+ adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+ *partition_membership[partition]));
+ hashtable_pool->getHashTable(partition)
+ ->upsertValueAccessorCompositeKeyFast(
+ argument_ids, adapter[partition].get(), key_ids, true);
+ }
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 8b59a3c..9603f38 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
class CatalogRelationSchema;
class ColumnVector;
class InsertDestinationInterface;
+class PartitionedHashTablePool;
class Predicate;
class Scalar;
class StorageBlockLayout;
@@ -476,6 +477,15 @@ class StorageBlock : public StorageBlockBase {
std::unique_ptr<TupleIdSequence> *reuse_matches,
std::vector<std::unique_ptr<ColumnVector>>
*reuse_group_by_vectors) const;
+
+ void aggregateGroupByPartitioned(
+ const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+ const std::vector<std::unique_ptr<const Scalar>> &group_by,
+ const Predicate *predicate,
+ std::unique_ptr<TupleIdSequence> *reuse_matches,
+ std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+ PartitionedHashTablePool *hashtable_pool) const;
+
/**
* @brief Inserts the GROUP BY expressions and aggregation arguments together
* as keys into the distinctify hash table.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/types/containers/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index aacb63a..c2a6623 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple
quickstep_catalog_CatalogTypedefs
quickstep_types_TypedValue
quickstep_types_containers_Tuple_proto
+ quickstep_utility_CompositeHash
quickstep_utility_Macros)
target_link_libraries(quickstep_types_containers_Tuple_proto
quickstep_types_TypedValue_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/types/containers/Tuple.hpp
----------------------------------------------------------------------
diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp
index 60f832c..6237d54 100644
--- a/types/containers/Tuple.hpp
+++ b/types/containers/Tuple.hpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/Tuple.pb.h"
+#include "utility/CompositeHash.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -218,6 +219,13 @@ class Tuple {
return attribute_values_.size();
}
+ /**
+ * @brief Get the hash value of the tuple.
+ **/
+ std::size_t getTupleHash() const {
+ return HashCompositeKey(attribute_values_);
+ }
+
private:
/**
* @brief Constructor which does not create any attributes, nor pre-reserve
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ddaae45..4fb6e5b 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -167,6 +167,7 @@ add_library(quickstep_utility_BloomFilter_proto
add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
add_library(quickstep_utility_ExecutionDAGVisualizer
@@ -227,6 +228,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory
glog)
target_link_libraries(quickstep_utility_CheckSnprintf
glog)
+target_link_libraries(quickstep_utility_CompositeHash
+ quickstep_types_TypedValue
+ quickstep_utility_HashPair
+ glog)
target_link_libraries(quickstep_utility_DAG
glog
quickstep_utility_Macros)
@@ -318,6 +323,7 @@ target_link_libraries(quickstep_utility
quickstep_utility_CalculateInstalledMemory
quickstep_utility_Cast
quickstep_utility_CheckSnprintf
+ quickstep_utility_CompositeHash
quickstep_utility_DAG
quickstep_utility_EqualsAnyConstant
quickstep_utility_ExecutionDAGVisualizer
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b22323e7/utility/CompositeHash.hpp
----------------------------------------------------------------------
diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp
new file mode 100644
index 0000000..517bc96
--- /dev/null
+++ b/utility/CompositeHash.hpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "types/TypedValue.hpp"
+#include "utility/HashPair.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief Compute the hash value of a composite key.
+ *
+ * @param key A vector of TypedValues which together form the composite key.
+ * @return The hash value.
+ **/
+static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) {
+ DCHECK(!key.empty());
+ std::size_t hash = key.front().getHash();
+ for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+ key_it != key.end();
+ ++key_it) {
+ hash = CombineHashes(hash, key_it->getHash());
+ }
+ return hash;
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_