You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/08/19 15:35:44 UTC

[2/5] incubator-quickstep git commit: Added function to aggregate group by with partition

Added function to aggregate group by with partition

- Added a function to compute a composite hash function for multiple
  attributes.
- Changes to Tuple class to support partitioning.
- A function in StorageBlock to compute aggregate with group by that
  supports partitioning.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e2014b4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e2014b4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e2014b4a

Branch: refs/heads/partitioned-aggregation
Commit: e2014b4a4a7d37f8823dc2b6a6a4eac285fd6b63
Parents: 3ae6c46
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Aug 18 12:30:18 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Thu Aug 18 12:30:18 2016 -0500

----------------------------------------------------------------------
 storage/AggregationOperationState.cpp |  65 +++++++++++------
 storage/AggregationOperationState.hpp |   7 ++
 storage/PartitionedHashTablePool.hpp  |   4 +
 storage/StorageBlock.cpp              | 113 +++++++++++++++++++++++++++++
 storage/StorageBlock.hpp              |  10 +++
 types/containers/CMakeLists.txt       |   1 +
 types/containers/Tuple.hpp            |   8 ++
 utility/CMakeLists.txt                |   6 ++
 utility/CompositeHash.hpp             |  52 +++++++++++++
 9 files changed, 245 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 8019473..483a6be 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -68,7 +68,8 @@ AggregationOperationState::AggregationOperationState(
     const HashTableImplType hash_table_impl_type,
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
-    : input_relation_(input_relation),
+    : is_aggregate_partitioned_(estimated_num_entries > kPartitionedAggregateThreshold),
+      input_relation_(input_relation),
       predicate_(predicate),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
@@ -185,15 +186,27 @@ AggregationOperationState::AggregationOperationState(
 
     if (!group_by_handles.empty()) {
       // Aggregation with GROUP BY: create a HashTable pool for per-group states.
-      group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
-            new HashTablePool(estimated_num_entries,
-                              hash_table_impl_type,
-                              group_by_types,
-                              payload_sizes,
-                              group_by_handles,
-                              storage_manager)));
+      if (!is_aggregate_partitioned_) {
+        group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
+              new HashTablePool(estimated_num_entries,
+                                hash_table_impl_type,
+                                group_by_types,
+                                payload_sizes,
+                                group_by_handles,
+                                storage_manager)));
+        }
+      else {
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         kNumPartitionsForAggregate,
+                                         hash_table_impl_type,
+                                         group_by_types,
+                                         payload_sizes,
+                                         group_by_handles,
+                                         storage_manager));
       }
     }
+  }
 }
 
 AggregationOperationState* AggregationOperationState::ReconstructFromProto(
@@ -425,19 +438,29 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
     }
   }
 
-  // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-  // directly into the (threadsafe) shared global HashTable for this
-  // aggregate.
-  DCHECK(group_by_hashtable_pools_[0] != nullptr);
-  AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
-  DCHECK(agg_hash_table != nullptr);
-  block->aggregateGroupByFast(arguments_,
-                              group_by_list_,
-                              predicate_.get(),
-                              agg_hash_table,
-                              &reuse_matches,
-                              &reuse_group_by_vectors);
-  group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
+  if (!is_aggregate_partitioned_) {
+    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+    // directly into the (threadsafe) shared global HashTable for this
+    // aggregate.
+    DCHECK(group_by_hashtable_pools_[0] != nullptr);
+    AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
+    DCHECK(agg_hash_table != nullptr);
+    block->aggregateGroupByFast(arguments_,
+                                group_by_list_,
+                                predicate_.get(),
+                                agg_hash_table,
+                                &reuse_matches,
+                                &reuse_group_by_vectors);
+    group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
+  } else {
+    block->aggregateGroupByPartitioned(
+        arguments_,
+        group_by_list_,
+        predicate_.get(),
+        &reuse_matches,
+        &reuse_group_by_vectors,
+        partitioned_group_by_hashtable_pool_.get());
+  }
 }
 
 void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 34724a9..501252a 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -169,6 +169,11 @@ class AggregationOperationState {
   int dflag;
 
  private:
+  static constexpr std::size_t kPartitionedAggregateThreshold = 100;
+  static constexpr std::size_t kNumPartitionsForAggregate = 40;
+
+  const bool is_aggregate_partitioned_;
+
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
   void mergeSingleState(const std::vector<std::unique_ptr<AggregationState>> &local_state);
@@ -217,6 +222,8 @@ class AggregationOperationState {
   // A vector of group by hash table pools, one for each group by clause.
   std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
 
+  std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+
   StorageManager *storage_manager_;
 
   void mergeGroupByHashTables(AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
index a71af44..7f58fa9 100644
--- a/storage/PartitionedHashTablePool.hpp
+++ b/storage/PartitionedHashTablePool.hpp
@@ -143,6 +143,10 @@ class PartitionedHashTablePool {
     return &hash_tables_;
   }
 
+  inline std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
  private:
   void initializeAllHashTables() {
     for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 7c8c101..cdab0ff 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -41,6 +41,7 @@
 #include "storage/IndexSubBlock.hpp"
 #include "storage/InsertDestinationInterface.hpp"
 #include "storage/PackedRowStoreTupleStorageSubBlock.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
 #include "storage/SMAIndexSubBlock.hpp"
 #include "storage/SplitRowStoreTupleStorageSubBlock.hpp"
 #include "storage/StorageBlockBase.hpp"
@@ -1450,4 +1451,116 @@ const std::size_t StorageBlock::getNumTuples() const {
   return tuple_store_->numTuples();
 }
 
+void StorageBlock::aggregateGroupByPartitioned(
+    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const Predicate *predicate,
+    std::unique_ptr<TupleIdSequence> *reuse_matches,
+    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+    PartitionedHashTablePool *hashtable_pool) const {
+  DCHECK_EQ(group_by.size(), 0u)
+      << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
+
+  SubBlocksReference sub_blocks_ref(*tuple_store_,
+                                    indices_,
+                                    indices_consistent_);
+
+  // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+  std::vector<attribute_id> arg_ids;
+  std::vector<std::vector<attribute_id>> argument_ids;
+
+  // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+  std::vector<attribute_id> key_ids;
+
+  // An intermediate ValueAccessor that stores the materialized 'arguments' for
+  // this aggregate, as well as the GROUP BY expression values.
+  ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ValueAccessor> accessor;
+  if (predicate) {
+    if (!*reuse_matches) {
+      // If there is a filter predicate that hasn't already been evaluated,
+      // evaluate it now and save the results for other aggregates on this
+      // same block.
+      reuse_matches->reset(getMatchesForPredicate(predicate));
+    }
+
+    // Create a filtered ValueAccessor that only iterates over predicate
+    // matches.
+    accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+  } else {
+    // Create a ValueAccessor that iterates over all tuples in this block
+    accessor.reset(tuple_store_->createValueAccessor());
+  }
+
+  attribute_id attr_id = 0;
+
+  // First, put GROUP BY keys into 'temp_result'.
+  if (reuse_group_by_vectors->empty()) {
+    // Compute GROUP BY values from group_by Scalars, and store them in
+    // reuse_group_by_vectors for reuse by other aggregates on this same
+    // block.
+    reuse_group_by_vectors->reserve(group_by.size());
+    for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+      reuse_group_by_vectors->emplace_back(
+          group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+      key_ids.push_back(attr_id++);
+    }
+  } else {
+    // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+    DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+        << "Wrong number of reuse_group_by_vectors";
+    for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+      temp_result.addColumn(reuse_cv.get(), false);
+      key_ids.push_back(attr_id++);
+    }
+  }
+
+  // Compute argument vectors and add them to 'temp_result'.
+  for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
+    arg_ids.clear();
+    for (const std::unique_ptr<const Scalar> &args : argument) {
+      temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+      arg_ids.push_back(attr_id++);
+    }
+    argument_ids.push_back(arg_ids);
+  }
+
+  // Compute the partitions for the tuple formed by group by values.
+  std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+  partition_membership.resize(hashtable_pool->getNumPartitions());
+
+  // Create a tuple-id sequence for each partition.
+  for (std::size_t partition = 0;
+       partition < hashtable_pool->getNumPartitions();
+       ++partition) {
+    partition_membership[partition].reset(new TupleIdSequence(temp_result.getEndPosition()));
+  }
+
+  // Iterate over ValueAccessor for each tuple,
+  // set a bit in the appropriate TupleIdSequence.
+  temp_result.beginIteration();
+  while (temp_result.next()) {
+    const std::size_t curr_tuple_partition_id =
+        temp_result.getTupleWithAttributes(key_ids)->getTupleHash() %
+        hashtable_pool->getNumPartitions();
+    partition_membership[curr_tuple_partition_id]->set(
+        temp_result.getCurrentPosition(), true);
+  }
+  // For each partition, create an adapter around Value Accessor and
+  // TupleIdSequence.
+  std::vector<std::unique_ptr<
+      TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+  adapter.resize(hashtable_pool->getNumPartitions());
+  for (std::size_t partition = 0;
+       partition < hashtable_pool->getNumPartitions();
+       ++partition) {
+    adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+        *partition_membership[partition]));
+    hashtable_pool->getHashTable(partition)
+        ->upsertValueAccessorCompositeKeyFast(
+            argument_ids, adapter[partition].get(), key_ids, true);
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 8a0bd42..4014dda 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -42,6 +42,7 @@ class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
 class InsertDestinationInterface;
+class PartitionedHashTablePool;
 class Predicate;
 class Scalar;
 class StorageBlockLayout;
@@ -474,6 +475,15 @@ class StorageBlock : public StorageBlockBase {
                         std::unique_ptr<TupleIdSequence> *reuse_matches,
                         std::vector<std::unique_ptr<ColumnVector>>
                             *reuse_group_by_vectors) const;
+
+  void aggregateGroupByPartitioned(
+      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const Predicate *predicate,
+      std::unique_ptr<TupleIdSequence> *reuse_matches,
+      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+      PartitionedHashTablePool *hashtable_pool) const;
+
   /**
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    *        as keys into the distinctify hash table.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/types/containers/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index 20c9e6c..9b67c6e 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -47,6 +47,7 @@ target_link_libraries(quickstep_types_containers_Tuple
                       quickstep_catalog_CatalogTypedefs
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple_proto
+                      quickstep_utility_CompositeHash
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_types_containers_Tuple_proto
                       quickstep_types_TypedValue_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/types/containers/Tuple.hpp
----------------------------------------------------------------------
diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp
index 7e62c8f..62da8dd 100644
--- a/types/containers/Tuple.hpp
+++ b/types/containers/Tuple.hpp
@@ -26,6 +26,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.pb.h"
+#include "utility/CompositeHash.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -216,6 +217,13 @@ class Tuple {
     return attribute_values_.size();
   }
 
+  /**
+   * @brief Get the hash value of the tuple.
+   **/
+  std::size_t getTupleHash() const {
+    return HashCompositeKey(attribute_values_);
+  }
+
  private:
   /**
    * @brief Constructor which does not create any attributes, nor pre-reserve

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 2d3db8f..0b17116 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -165,6 +165,7 @@ add_library(quickstep_utility_BloomFilter_proto
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
 add_library(quickstep_utility_Glob Glob.cpp Glob.hpp)
@@ -222,6 +223,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
                       glog)
+target_link_libraries(quickstep_utility_CompositeHash
+                      quickstep_types_TypedValue
+                      quickstep_utility_HashPair
+                      glog)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -301,6 +306,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
+                      quickstep_utility_CompositeHash
                       quickstep_utility_DAG
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Glob

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2014b4a/utility/CompositeHash.hpp
----------------------------------------------------------------------
diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp
new file mode 100644
index 0000000..517bc96
--- /dev/null
+++ b/utility/CompositeHash.hpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "types/TypedValue.hpp"
+#include "utility/HashPair.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief Compute the hash value of a composite key.
+ *
+ * @param key A vector of TypedValues which together form the composite key.
+ * @return The hash value.
+ **/
+static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) {
+  DCHECK(!key.empty());
+  std::size_t hash = key.front().getHash();
+  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+       key_it != key.end();
+       ++key_it) {
+    hash = CombineHashes(hash, key_it->getHash());
+  }
+  return hash;
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_