You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/23 17:37:53 UTC

incubator-quickstep git commit: Initial commit

Repository: incubator-quickstep
Updated Branches:
  refs/heads/parallel-distinct-agg [created] a7e228669


Initial commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a7e22866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a7e22866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a7e22866

Branch: refs/heads/parallel-distinct-agg
Commit: a7e228669900d9ae9b9f7c639fc33b0a6dcbdabd
Parents: 4be8e91
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Feb 20 20:05:08 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Feb 23 11:34:18 2017 -0600

----------------------------------------------------------------------
 storage/AggregationOperationState.cpp | 49 ++++++++++++++++++------------
 storage/AggregationOperationState.hpp |  3 ++
 storage/PackedPayloadHashTable.hpp    | 21 ++++++++-----
 3 files changed, 46 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a7e22866/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f39b41..e9a86ce 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -19,8 +19,10 @@
 
 #include "storage/AggregationOperationState.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <string>
 #include <utility>
@@ -87,6 +89,8 @@ AggregationOperationState::AggregationOperationState(
       is_aggregate_partitioned_(false),
       predicate_(predicate),
       is_distinct_(std::move(is_distinct)),
+      all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(),
+                                    true, std::logical_and<bool>())),
       storage_manager_(storage_manager) {
   if (!group_by.empty()) {
     if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
@@ -188,7 +192,7 @@ AggregationOperationState::AggregationOperationState(
           AggregationStateHashTableFactory::CreateResizable(
               *distinctify_hash_table_impl_types_it,
               key_types,
-              estimated_num_entries,
+              estimated_num_entries * 1000000u,
               {} /* handles */,
               storage_manager));
       ++distinctify_hash_table_impl_types_it;
@@ -621,13 +625,15 @@ void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
     }
   }
 
-  AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pool_->getHashTable();
+  if (!all_distinct_) {
+    AggregationStateHashTableBase *agg_hash_table =
+        group_by_hashtable_pool_->getHashTable();
 
-  agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
-                                                  group_by_key_ids_,
-                                                  accessor_mux);
-  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+    agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
+                                                    group_by_key_ids_,
+                                                    accessor_mux);
+    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  }
 }
 
 void AggregationOperationState::finalizeAggregate(
@@ -790,19 +796,24 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
   // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
   // e.g. Keep merging entries from smaller hash tables to larger.
 
-  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  DCHECK(hash_tables != nullptr);
-  if (hash_tables->empty()) {
-    return;
-  }
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr;
 
-  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
-      hash_tables->back().release());
-  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
-    std::unique_ptr<AggregationStateHashTableBase> hash_table(
-        hash_tables->at(i).release());
-    mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
-    hash_table->destroyPayload();
+  if (all_distinct_) {
+    final_hash_table_ptr.reset(group_by_hashtable_pool_->getHashTable());
+  } else {
+    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+    DCHECK(hash_tables != nullptr);
+    if (hash_tables->empty()) {
+      return;
+    }
+
+    final_hash_table_ptr.reset(hash_tables->back().release());
+    for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+      std::unique_ptr<AggregationStateHashTableBase> hash_table(
+          hash_tables->at(i).release());
+      mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
+      hash_table->destroyPayload();
+    }
   }
 
   PackedPayloadHashTable *final_hash_table =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a7e22866/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c8930ee..6c9690a 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -273,6 +273,9 @@ class AggregationOperationState {
   // arguments.
   std::vector<bool> is_distinct_;
 
+  // A flag indicating whether all aggregate functions are DISTINCT aggregations.
+  const bool all_distinct_;
+
   // Non-trivial group-by/argument expressions that need to be evaluated.
   std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a7e22866/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index f87a1de..ac127a0 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -336,6 +336,7 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
                                        const std::uint8_t **value,
                                        std::size_t *entry_num) const;
 
+  template <bool key_only = false>
   inline std::uint8_t* upsertCompositeKeyInternal(
       const std::vector<TypedValue> &key,
       const std::size_t variable_key_size);
@@ -763,7 +764,7 @@ inline bool PackedPayloadHashTable::upsertCompositeKey(
   }
 }
 
-
+template <bool key_only>
 inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
     const std::vector<TypedValue> &key,
     const std::size_t variable_key_size) {
@@ -809,7 +810,9 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
   writeCompositeKeyToBucket(key, hash_code, bucket);
 
   std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
-  std::memcpy(value, init_payload_, this->total_payload_size_);
+  if (!key_only) {
+    std::memcpy(value, init_payload_, this->total_payload_size_);
+  }
 
   // Update the previous chaing pointer to point to the new bucket.
   pending_chain_ptr->store(pending_chain_ptr_finish_value,
@@ -825,13 +828,14 @@ inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
     const std::vector<MultiSourceAttributeId> &key_ids,
     ValueAccessor *base_accessor,
     ColumnVectorsValueAccessor *derived_accessor) {
-  std::size_t variable_size;
+//  std::size_t variable_size;
   std::vector<TypedValue> key_vector;
   key_vector.resize(key_ids.size());
 
   return InvokeOnAnyValueAccessor(
       base_accessor,
       [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
+    const bool key_only = (num_handles_ == 0);
     bool continuing = true;
     while (continuing) {
       {
@@ -848,13 +852,14 @@ inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
                   &key_vector)) {
             continue;
           }
-          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
-          std::uint8_t *value = this->upsertCompositeKeyInternal(
-              key_vector, variable_size);
+//          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+          std::uint8_t *value = this->template upsertCompositeKeyInternal<true>(
+//              key_vector, variable_size, key_only);
+              key_vector, 0);
           if (value == nullptr) {
             continuing = true;
             break;
-          } else {
+          } else if (!key_only) {
             SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
             for (unsigned int k = 0; k < num_handles_; ++k) {
               const auto &ids = argument_ids[k];
@@ -876,7 +881,7 @@ inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
         }
       }
       if (continuing) {
-        this->resize(0, variable_size);
+        this->resize(0, 100u);
         accessor->previous();
         if (use_two_accessors) {
           derived_accessor->previous();