You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/06 20:16:43 UTC
[66/73] [abbrv] incubator-quickstep git commit: Created method for
partition aware finalize aggregate.
Created method for partition aware finalize aggregate.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/07afae28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/07afae28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/07afae28
Branch: refs/heads/partitioned-aggregation
Commit: 07afae2849cbeb1cfa271d52df4482e39740dcc8
Parents: b22323e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Aug 18 16:54:38 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Sep 6 15:01:30 2016 -0500
----------------------------------------------------------------------
storage/AggregationOperationState.cpp | 69 +++++++++++++++++++++++++++++-
storage/AggregationOperationState.hpp | 33 +++++++++++++-
2 files changed, 98 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07afae28/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index c39e98a..6b4a672 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -566,8 +566,14 @@ void AggregationOperationState::finalizeHashTable(
}
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
DCHECK(agg_hash_table != nullptr);
- ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *agg_hash_table, &group_by_keys, agg_idx);
+ // TODO(harshad) - Modify the finalizeHashTable() function called below such
+ // that group_by_keys is a single ColumnVectorValueAccessor in which there
+ // is one ColumnVector per group by key. If we do that, the code below
+ // for reorganizing group_by_keys can be removed.
+ ColumnVector* agg_result_col =
+ handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
+ &group_by_keys,
+ agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
@@ -618,4 +624,63 @@ void AggregationOperationState::finalizeHashTable(
output_destination->bulkInsertTuples(&complete_result);
}
+void AggregationOperationState::finalizeAggregatePartitioned(
+ const std::size_t partition_id, InsertDestination *output_destination) {
+ // Each element of 'group_by_keys' is a vector of values for a particular
+ // group (which is also the prefix of the finalized Tuple for that group).
+ std::vector<std::vector<TypedValue>> group_by_keys;
+
+ // Collect per-aggregate finalized values.
+ std::vector<std::unique_ptr<ColumnVector>> final_values;
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+ AggregationStateHashTableBase *hash_table =
+ partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
+ ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+ *hash_table, &group_by_keys, agg_idx);
+ if (agg_result_col != nullptr) {
+ final_values.emplace_back(agg_result_col);
+ }
+ }
+
+ // Reorganize 'group_by_keys' in column-major order so that we can make a
+ // ColumnVectorsValueAccessor to bulk-insert results.
+ //
+ // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
+ // if there is only one aggregate. The need to do this should hopefully go
+ // away when we work out storing composite structures for multiple aggregates
+ // in a single HashTable.
+ std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
+ std::size_t group_by_element_idx = 0;
+ for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
+ const Type &group_by_type = group_by_element->getType();
+ if (NativeColumnVector::UsableForType(group_by_type)) {
+ NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ group_by_cvs.emplace_back(element_cv);
+ for (std::vector<TypedValue> &group_key : group_by_keys) {
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ }
+ } else {
+ IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ group_by_cvs.emplace_back(element_cv);
+ for (std::vector<TypedValue> &group_key : group_by_keys) {
+ element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ }
+ }
+ ++group_by_element_idx;
+ }
+
+ // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
+ // and the finalized aggregates.
+ ColumnVectorsValueAccessor complete_result;
+ for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
+ complete_result.addColumn(group_by_cv.release());
+ }
+ for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
+ complete_result.addColumn(final_value_cv.release());
+ }
+
+ // Bulk-insert the complete result.
+ output_destination->bulkInsertTuples(&complete_result);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07afae28/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 7e8acb5..37d77e3 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -168,8 +168,37 @@ class AggregationOperationState {
**/
void finalizeAggregate(InsertDestination *output_destination);
- static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
- AggregationStateHashTableBase *dst);
+ /**
+ * @brief Generate the final results for the aggregates managed by this
+ * AggregationOperationState, for the given partition and write them
+ * out to StorageBlock(s).
+ *
+ * @param partition_id The Partition ID for which the finalize has to be
+ * performed.
+ * @param output_destination An InsertDestination where the finalized output
+ * tuple(s) from this aggregate are to be written.
+ **/
+ void finalizeAggregatePartitioned(const std::size_t partition_id,
+ InsertDestination *output_destination);
+
+ bool isAggregatePartitioned() const {
+ return is_aggregate_partitioned_;
+ }
+
+ /**
+ * @brief Get the number of partitions used for the aggregation.
+ *
+ * @note This is relevant only when is_aggregate_partitioned_ is true.
+ *
+ * @return The number of partitions used for the aggregation. Default is 1.
+ **/
+ std::size_t getNumPartitions() const {
+ if (is_aggregate_partitioned_) {
+ return partitioned_group_by_hashtable_pool_->getNumPartitions();
+ } else {
+ return 1;
+ }
+ }
int dflag;